root/trunk/pdns/pdns/pdns_recursor.cc @ 843

Revision 843, 52.1 KB (checked in by ahu, 7 years ago)

make query chaining select on type as well..

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2006  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#ifndef WIN32
20# include <netdb.h>
21# include <unistd.h>
22#else
23 #include "ntservice.hh"
24 #include "recursorservice.hh"
25#endif // WIN32
26
27#include "utility.hh"
28#include <iostream>
29#include <errno.h>
30#include <map>
31#include <set>
32#include "recursor_cache.hh"
33#include <stdio.h>
34#include <signal.h>
35#include <stdlib.h>
36
37#include "mtasker.hh"
38#include <utility>
39#include "arguments.hh"
40#include "syncres.hh"
41#include <fcntl.h>
42#include <fstream>
43#include "sstuff.hh"
44#include <boost/tuple/tuple.hpp>
45#include <boost/tuple/tuple_comparison.hpp>
46#include <boost/shared_array.hpp>
47#include <boost/lexical_cast.hpp>
48#include <boost/function.hpp>
49#include <boost/algorithm/string.hpp>
50#include "dnsparser.hh"
51#include "dnswriter.hh"
52#include "dnsrecords.hh"
53#include "zoneparser-tng.hh"
54#include "rec_channel.hh"
55#include "logger.hh"
56#include "iputils.hh"
57#include "mplexer.hh"
58#include "config.h"
59
60#ifndef RECURSOR
61#include "statbag.hh"
62StatBag S;
63#endif
64
65FDMultiplexer* g_fdm;
66unsigned int g_maxTCPPerClient;
67bool g_logCommonErrors;
68using namespace boost;
69
70#ifdef __FreeBSD__           // see cvstrac ticket #26
71#include <pthread.h>
72#include <semaphore.h>
73#endif
74
75MemRecursorCache RC;
76RecursorStats g_stats;
77bool g_quiet;
78NetmaskGroup* g_allowFrom;
79string s_programname="pdns_recursor";
80typedef vector<int> g_tcpListenSockets_t;
81g_tcpListenSockets_t g_tcpListenSockets;
82int g_tcpTimeout;
83
84struct DNSComboWriter {
85  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), d_tcp(false), d_socket(-1)
86  {}
87  MOADNSParser d_mdp;
88  void setRemote(ComboAddress* sa)
89  {
90    d_remote=*sa;
91  }
92
93  void setSocket(int sock)
94  {
95    d_socket=sock;
96  }
97
98  string getRemote() const
99  {
100    return d_remote.toString();
101  }
102
103  struct timeval d_now;
104  ComboAddress d_remote;
105  bool d_tcp;
106  int d_socket;
107};
108
109
110#ifndef WIN32
111#ifndef __FreeBSD__
112extern "C" {
113  int sem_init(sem_t*, int, unsigned int){return 0;}
114  int sem_wait(sem_t*){return 0;}
115  int sem_trywait(sem_t*){return 0;}
116  int sem_post(sem_t*){return 0;}
117  int sem_getvalue(sem_t*, int*){return 0;}
118  pthread_t pthread_self(void){return (pthread_t) 0;}
119  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
120  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
121  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
122  int pthread_mutex_destroy(pthread_mutex_t *mutex) { return 0; }
123}
124#endif // __FreeBSD__
125#endif // WIN32
126
127ArgvMap &arg()
128{
129  static ArgvMap theArg;
130  return theArg;
131}
132
133struct timeval g_now;
134typedef vector<int> tcpserversocks_t;
135
136typedef MTasker<PacketID,string> MT_t;
137MT_t* MT;
138
139
140void handleTCPClientWritable(int fd, boost::any& var);
141
142// -1 is error, 0 is timeout, 1 is success
143int asendtcp(const string& data, Socket* sock) 
144{
145  PacketID pident;
146  pident.sock=sock;
147  pident.outMSG=data;
148
149  g_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
150  string packet;
151
152  int ret=MT->waitEvent(pident,&packet,1);
153  if(!ret || ret==-1) { // timeout
154    g_fdm->removeWriteFD(sock->getHandle());
155  }
156  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
157    return -1;
158  }
159  return ret;
160}
161
162void handleTCPClientReadable(int fd, boost::any& var);
163
164// -1 is error, 0 is timeout, 1 is success
165int arecvtcp(string& data, int len, Socket* sock) 
166{
167  data.clear();
168  PacketID pident;
169  pident.sock=sock;
170  pident.inNeeded=len;
171  g_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
172
173  int ret=MT->waitEvent(pident,&data,1);
174  if(!ret || ret==-1) { // timeout
175    g_fdm->removeReadFD(sock->getHandle());
176  }
177  else if(data.empty()) {// error, EOF or other
178    return -1;
179  }
180
181  return ret;
182}
183
184// returns -1 for errors which might go away, throws for ones that won't
185int makeClientSocket(int family)
186{
187  int ret=(int)socket(family, SOCK_DGRAM, 0);
188  if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
189    return ret;
190
191  if(ret<0) 
192    throw AhuException("Making a socket for resolver: "+stringerror());
193
194  static optional<ComboAddress> sin4;
195  if(!sin4) {
196    sin4=ComboAddress(::arg()["query-local-address"]);
197  }
198  static optional<ComboAddress> sin6;
199  if(!sin6) {
200    if(!::arg()["query-local-address6"].empty())
201    sin6=ComboAddress(::arg()["query-local-address6"]);
202  }
203
204  int tries=10;
205  while(--tries) {
206    uint16_t port=1025+Utility::random()%64510;
207    if(tries==1)  // fall back to kernel 'random'
208        port=0;
209
210    if(family==AF_INET) {
211      sin4->sin4.sin_port = htons(port); 
212     
213      if (::bind(ret, (struct sockaddr *)&*sin4, sin4->getSocklen()) >= 0) 
214        break;
215    }
216    else {
217      sin6->sin6.sin6_port = htons(port); 
218     
219      if (::bind(ret, (struct sockaddr *)&*sin6, sin6->getSocklen()) >= 0) 
220        break;
221    }
222  }
223  if(!tries)
224    throw AhuException("Resolver binding to local query client socket: "+stringerror());
225
226  Utility::setNonBlocking(ret);
227  return ret;
228}
229
230void handleUDPServerResponse(int fd, boost::any&);
231
232// you can ask this class for a UDP socket to send a query from
233// this socket is not yours, don't even think about deleting it
234// but after you call 'returnSocket' on it, don't assume anything anymore
235class UDPClientSocks
236{
237  unsigned int d_numsocks;
238  unsigned int d_maxsocks;
239
240public:
241  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
242  {
243  }
244
245  typedef set<int> socks_t;
246  socks_t d_socks;
247
248  // returning -1 means: temporary OS error (ie, out of files)
249  int getSocket(uint16_t family)
250  {
251    int fd=makeClientSocket(family);
252    if(fd < 0) // temporary error - receive exception otherwise
253      return -1;
254
255    d_socks.insert(fd);
256    d_numsocks++;
257    return fd;
258  }
259
260  void returnSocket(int fd)
261  {
262    socks_t::iterator i=d_socks.find(fd);
263    if(i==d_socks.end()) {
264      throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
265    }
266    returnSocket(i);
267  }
268
269  // return a socket to the pool, or simply erase it
270  void returnSocket(socks_t::iterator& i)
271  {
272    if(i==d_socks.end()) {
273      throw AhuException("Trying to return a socket not in the pool");
274    }
275    g_fdm->removeReadFD(*i);
276    Utility::closesocket(*i);
277   
278    d_socks.erase(i++);
279    --d_numsocks;
280  }
281}g_udpclientsocks;
282
283
284/* these two functions are used by LWRes */
285// -2 is OS error, -1 is error that depends on the remote, > 0 is success
286int asendto(const char *data, int len, int flags, 
287            const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd) 
288{
289
290  PacketID pident;
291  pident.domain = domain;
292  pident.remote = toaddr;
293  pident.type = qtype;
294
295  // see if there is an existing outstanding request we can chain on to, using partial equivalence function
296  pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
297
298  for(; chain.first != chain.second; chain.first++) {
299    if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
300      //      cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
301      // cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
302      // <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
303     
304      chain.first->key.chain.insert(id); // we can chain
305      *fd=-1;                            // gets used in waitEvent / sendEvent later on
306      return 1;
307    }
308  }
309
310  *fd=g_udpclientsocks.getSocket(toaddr.sin4.sin_family);
311  if(*fd < 0)
312    return -2;
313
314  pident.fd=*fd;
315  pident.id=id;
316 
317  int ret=connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen());
318  if(ret < 0) {
319    if(errno==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
320      return -2;
321    return ret;
322  }
323
324  g_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
325  return send(*fd, data, len, 0);
326}
327
328// -1 is error, 0 is timeout, 1 is success
329int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len, 
330              uint16_t id, const string& domain, uint16_t qtype, int fd)
331{
332  static optional<unsigned int> nearMissLimit;
333  if(!nearMissLimit) 
334    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
335
336  PacketID pident;
337  pident.fd=fd;
338  pident.id=id;
339  pident.domain=domain;
340  pident.type = qtype;
341  pident.remote=fromaddr;
342
343  string packet;
344  int ret=MT->waitEvent(pident, &packet, 1);
345
346  if(ret > 0) {
347    if(packet.empty()) // means "error"
348      return -1; 
349
350    *d_len=(int)packet.size();
351    memcpy(data,packet.c_str(),min(len,*d_len));
352    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
353      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
354      g_stats.spoofCount++;
355      return -1;
356    }
357  }
358  else {
359    if(fd >= 0)
360      g_udpclientsocks.returnSocket(fd);
361  }
362  return ret;
363}
364
365void setBuffer(int fd, int optname, uint32_t size)
366{
367  uint32_t psize=0;
368  socklen_t len=sizeof(psize);
369 
370  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
371    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
372    return; 
373  }
374
375  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
376    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
377}
378
379
380static void setReceiveBuffer(int fd, uint32_t size)
381{
382  setBuffer(fd, SO_RCVBUF, size);
383}
384
385static void setSendBuffer(int fd, uint32_t size)
386{
387  setBuffer(fd, SO_SNDBUF, size);
388}
389
390static void writePid(void)
391{
392  string fname=::arg()["socket-dir"]+"/"+s_programname+".pid";
393  ofstream of(fname.c_str());
394  if(of)
395    of<< Utility::getpid() <<endl;
396  else
397    L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
398}
399
400void primeHints(void)
401{
402  // prime root cache
403  set<DNSResourceRecord>nsset;
404
405  if(::arg()["hint-file"].empty()) {
406    static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
407                       "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
408    DNSResourceRecord arr, nsrr;
409    arr.qtype=QType::A;
410    arr.ttl=time(0)+3600000;
411    nsrr.qtype=QType::NS;
412    nsrr.ttl=time(0)+3600000;
413   
414    for(char c='a';c<='m';++c) {
415      static char templ[40];
416      strncpy(templ,"a.root-servers.net.", sizeof(templ) - 1);
417      *templ=c;
418      arr.qname=nsrr.content=templ;
419      arr.content=ips[c-'a'];
420      set<DNSResourceRecord> aset;
421      aset.insert(arr);
422      RC.replace(time(0), string(templ), QType(QType::A), aset, true); // auth, nuke it all
423     
424      nsset.insert(nsrr);
425    }
426  }
427  else {
428    ZoneParserTNG zpt(::arg()["hint-file"]);
429    DNSResourceRecord rr;
430    set<DNSResourceRecord> aset;
431
432    while(zpt.get(rr)) {
433      rr.ttl+=time(0);
434      if(rr.qtype.getCode()==QType::A) {
435        set<DNSResourceRecord> aset;
436        aset.insert(rr);
437        RC.replace(time(0), rr.qname, QType(QType::A), aset, true); // auth, etc see above
438      }
439      if(rr.qtype.getCode()==QType::NS) {
440        rr.content=toLower(rr.content);
441        nsset.insert(rr);
442      }
443    }
444  }
445  RC.replace(time(0),".", QType(QType::NS), nsset, true); // and stuff in the cache (auth)
446}
447
448map<ComboAddress, uint32_t> g_tcpClientCounts;
449
450struct TCPConnection
451{
452  int fd;
453  enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state;
454  int qlen;
455  int bytesread;
456  ComboAddress remote;
457  char data[65535];
458  time_t startTime;
459
460  void closeAndCleanup()
461  {
462    Utility::closesocket(fd);
463    if(!g_tcpClientCounts[remote]--) 
464      g_tcpClientCounts.erase(remote);
465    s_currentConnections--;
466  }
467  static unsigned int s_currentConnections; //!< total number of current TCP connections
468};
469
470unsigned int TCPConnection::s_currentConnections; 
471
472void startDoResolve(void *p)
473{
474  DNSComboWriter* dc=(DNSComboWriter *)p;
475  try {
476    uint16_t maxudpsize=512;
477    MOADNSParser::EDNSOpts edo;
478    if(dc->d_mdp.getEDNSOpts(&edo)) {
479      maxudpsize=edo.d_packetsize;
480    }
481   
482    vector<DNSResourceRecord> ret;
483   
484    vector<uint8_t> packet;
485    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
486
487    pw.getHeader()->aa=0;
488    pw.getHeader()->ra=1;
489    pw.getHeader()->qr=1;
490    pw.getHeader()->id=dc->d_mdp.d_header.id;
491    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
492
493    SyncRes sr(dc->d_now);
494    if(!g_quiet)
495      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
496       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
497
498    sr.setId(MT->getTid());
499    if(!dc->d_mdp.d_header.rd)
500      sr.setCacheOnly();
501
502    int res=sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
503    if(res<0) {
504      pw.getHeader()->rcode=RCode::ServFail;
505      // no commit here, because no record
506      g_stats.servFails++;
507    }
508    else {
509      pw.getHeader()->rcode=res;
510      switch(res) {
511      case RCode::ServFail:
512        g_stats.servFails++;
513        break;
514      case RCode::NXDomain:
515        g_stats.nxDomains++;
516        break;
517      case RCode::NoError:
518        g_stats.noErrors++;
519        break;
520      }
521     
522      if(ret.size()) {
523        shuffle(ret);
524        for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i) {
525          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place);
526          shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content));
527         
528          drc->toPacket(pw);
529       
530          if(!dc->d_tcp && pw.size() > maxudpsize) {
531            pw.rollback();
532            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
533              pw.getHeader()->tc=1;
534            goto sendit; // need to jump over pw.commit
535          }
536        }
537        pw.commit();
538      }
539    }
540  sendit:;
541    if(!dc->d_tcp) {
542      sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
543    }
544    else {
545      char buf[2];
546      buf[0]=packet.size()/256;
547      buf[1]=packet.size()%256;
548
549      Utility::iovec iov[2];
550
551      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
552      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
553
554      int ret=Utility::writev(dc->d_socket, iov, 2);
555      bool hadError=true;
556
557      if(ret == 0) 
558        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
559      else if(ret < 0 ) 
560        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
561      else if((unsigned int)ret != 2 + packet.size())
562        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
563      else
564        hadError=false;
565     
566      // update tcp connection status, either by closing or moving to 'BYTE0'
567
568      if(hadError) {
569        g_fdm->removeReadFD(dc->d_socket);
570  Utility::closesocket(dc->d_socket);
571      }
572      else {
573        any_cast<TCPConnection>(&g_fdm->getReadParameter(dc->d_socket))->state=TCPConnection::BYTE0;
574        struct timeval now; 
575        Utility::gettimeofday(&now, 0); // needs to be updated
576        g_fdm->setReadTTD(dc->d_socket, now, g_tcpTimeout);
577      }
578    }
579
580    if(!g_quiet) {
581      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
582      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
583        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
584    }
585   
586    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
587    float spent=makeFloat(sr.d_now-dc->d_now);
588    if(spent < 0.001)
589      g_stats.answers0_1++;
590    else if(spent < 0.010)
591      g_stats.answers1_10++;
592    else if(spent < 0.1)
593      g_stats.answers10_100++;
594    else if(spent < 1.0)
595      g_stats.answers100_1000++;
596    else
597      g_stats.answersSlow++;
598
599    uint64_t newLat=(uint64_t)(spent*1000000);
600    if(newLat < 1000000)  // outliers of several minutes exist..
601      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
602    delete dc;
603  }
604  catch(AhuException &ae) {
605    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
606  }
607  catch(MOADNSException& e) {
608    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
609  }
610  catch(exception& e) {
611    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
612  }
613  catch(...) {
614    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
615  }
616}
617
618RecursorControlChannel s_rcc;
619
620void makeControlChannelSocket()
621{
622  string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
623  if(::arg().mustDo("fork")) {
624    sockname+="."+lexical_cast<string>(Utility::getpid());
625    L<<Logger::Warning<<"Forked control socket name: "<<sockname<<endl;
626  }
627  s_rcc.listen(sockname);
628}
629
630void handleRunningTCPQuestion(int fd, boost::any& var)
631{
632  TCPConnection* conn=any_cast<TCPConnection>(&var);
633  if(conn->state==TCPConnection::BYTE0) {
634
635    int bytes=recv(conn->fd, conn->data, 2, 0);
636    if(bytes==1)
637      conn->state=TCPConnection::BYTE1;
638    if(bytes==2) { 
639      conn->qlen=(conn->data[0]<<8)+conn->data[1];
640      conn->bytesread=0;
641      conn->state=TCPConnection::GETQUESTION;
642    }
643    if(!bytes || bytes < 0) {
644      TCPConnection tmp(*conn); 
645      g_fdm->removeReadFD(fd);
646      tmp.closeAndCleanup();
647      return;
648    }
649  }
650  else if(conn->state==TCPConnection::BYTE1) {
651    int bytes=recv(conn->fd,conn->data+1,1,0);
652    if(bytes==1) {
653      conn->state=TCPConnection::GETQUESTION;
654      conn->qlen=(conn->data[0]<<8)+conn->data[1];
655      conn->bytesread=0;
656    }
657    if(!bytes || bytes < 0) {
658      if(g_logCommonErrors)
659        L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected after first byte"<<endl;
660      TCPConnection tmp(*conn); 
661      g_fdm->removeReadFD(fd);
662      tmp.closeAndCleanup();  // conn loses validity here..
663      return;
664    }
665  }
666  else if(conn->state==TCPConnection::GETQUESTION) {
667    int bytes=recv(conn->fd,conn->data + conn->bytesread,conn->qlen - conn->bytesread, 0);
668    if(!bytes || bytes < 0) {
669      L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected while reading question body"<<endl;
670      TCPConnection tmp(*conn);
671      g_fdm->removeReadFD(fd);
672      tmp.closeAndCleanup();  // conn loses validity here..
673
674      return;
675    }
676    conn->bytesread+=bytes;
677    if(conn->bytesread==conn->qlen) {
678      conn->state=TCPConnection::DONE;        // this makes us immune from timeouts, from now on *we* are responsible
679      DNSComboWriter* dc=0;
680      try {
681        dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
682      }
683      catch(MOADNSException &mde) {
684        g_stats.clientParseError++; 
685        L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->remote.toString() <<endl;
686        TCPConnection tmp(*conn); 
687        g_fdm->removeReadFD(fd);
688        tmp.closeAndCleanup();
689        return;
690      }
691     
692      dc->setSocket(conn->fd);
693      dc->d_tcp=true;
694      dc->setRemote(&conn->remote);
695      if(dc->d_mdp.d_header.qr)
696        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
697      else {
698        ++g_stats.qcounter;
699        ++g_stats.tcpqcounter;
700        MT->makeThread(startDoResolve, dc);
701        return;
702      }
703    }
704  }
705}
706
707//! Handle new incoming TCP connection
708void handleNewTCPQuestion(int fd, boost::any& )
709{
710  ComboAddress addr;
711  socklen_t addrlen=sizeof(addr);
712  int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
713  if(newsock>0) {
714    g_stats.addRemote(addr);
715    if(g_allowFrom && !g_allowFrom->match(&addr)) {
716      if(!g_quiet) 
717        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
718
719      g_stats.unauthorizedTCP++;
720      Utility::closesocket(newsock);
721      return;
722    }
723   
724    if(g_maxTCPPerClient && g_tcpClientCounts.count(addr) && g_tcpClientCounts[addr] >= g_maxTCPPerClient) {
725      g_stats.tcpClientOverflow++;
726      Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
727      return;
728    }
729    g_tcpClientCounts[addr]++;
730    Utility::setNonBlocking(newsock);
731    TCPConnection tc;
732    tc.fd=newsock;
733    tc.state=TCPConnection::BYTE0;
734    tc.remote=addr;
735    tc.startTime=g_now.tv_sec;
736    TCPConnection::s_currentConnections++;
737    g_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
738
739    struct timeval now;
740    Utility::gettimeofday(&now, 0);
741    g_fdm->setReadTTD(tc.fd, now, g_tcpTimeout);
742  }
743}
744 
745void handleNewUDPQuestion(int fd, boost::any& var)
746{
747  int len;
748  char data[1500];
749  ComboAddress fromaddr;
750  socklen_t addrlen=sizeof(fromaddr);
751
752  if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
753    g_stats.addRemote(fromaddr);
754    if(g_allowFrom && !g_allowFrom->match(&fromaddr)) {
755      cout<<"mapped: "<<fromaddr.isMappedIPv4()<<endl;
756      if(!g_quiet) 
757        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
758
759      g_stats.unauthorizedUDP++;
760      return;
761    }
762    try {
763      DNSComboWriter* dc = new DNSComboWriter(data, len, g_now);
764     
765      dc->setRemote(&fromaddr);
766     
767      if(dc->d_mdp.d_header.qr) {
768        if(g_logCommonErrors)
769          L<<Logger::Error<<"Ignoring answer from "<<dc->getRemote()<<" on server socket!"<<endl;
770      }
771      else {
772        ++g_stats.qcounter;
773        dc->setSocket(fd);
774        dc->d_tcp=false;
775        MT->makeThread(startDoResolve, (void*) dc);
776      }
777    }
778    catch(MOADNSException& mde) {
779      g_stats.clientParseError++; 
780      L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
781    }
782  }
783}
784
785typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
786deferredAdd_t deferredAdd;
787
788void makeTCPServerSockets()
789{
790  int fd;
791  vector<string>locals;
792  stringtok(locals,::arg()["local-address"]," ,");
793
794  if(locals.empty())
795    throw AhuException("No local address specified");
796 
797  ComboAddress sin;
798  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
799    memset((char *)&sin,0, sizeof(sin));
800    sin.sin4.sin_family = AF_INET;
801    if(!IpToU32(*i, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
802      sin.sin6.sin6_family = AF_INET6;
803      if(Utility::inet_pton(AF_INET6, i->c_str(), &sin.sin6.sin6_addr) <= 0)
804        throw AhuException("Unable to resolve local address '"+ *i +"'"); 
805    }
806
807    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
808    if(fd<0) 
809      throw AhuException("Making a TCP server socket for resolver: "+stringerror());
810
811    int tmp=1;
812    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
813      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
814      exit(1);
815    }
816   
817#ifdef TCP_DEFER_ACCEPT
818    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
819      if(i==locals.begin())
820        L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
821    }
822#endif
823
824    sin.sin4.sin_port = htons(::arg().asNum("local-port")); 
825    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
826    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0) 
827      throw AhuException("Binding TCP server socket for "+*i+": "+stringerror());
828   
829    Utility::setNonBlocking(fd);
830    setSendBuffer(fd, 65000);
831    listen(fd, 128);
832    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
833    //    g_fdm->addReadFD(fd, handleNewTCPQuestion);
834    if(sin.sin4.sin_family == AF_INET) 
835      L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<::arg().asNum("local-port")<<endl;
836    else
837      L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<::arg().asNum("local-port")<<endl;
838  }
839}
840
841void makeUDPServerSockets()
842{
843  vector<string>locals;
844  stringtok(locals,::arg()["local-address"]," ,");
845
846  if(locals.empty())
847    throw AhuException("No local address specified");
848 
849  if(::arg()["local-address"]=="0.0.0.0") {
850    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
851  }
852
853  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
854    ComboAddress sin;
855
856    memset(&sin, 0, sizeof(sin));
857    sin.sin4.sin_family = AF_INET;
858    if(!IpToU32(*i, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
859      sin.sin6.sin6_family = AF_INET6;
860      if(Utility::inet_pton(AF_INET6, i->c_str(), &sin.sin6.sin6_addr) <= 0)
861        throw AhuException("Unable to resolve local address '"+ *i +"'"); 
862    }
863   
864    int fd=socket(sin.sin4.sin_family, SOCK_DGRAM,0);
865    if(fd < 0) {
866      throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
867    }
868
869    setReceiveBuffer(fd, 200000);
870    sin.sin4.sin_port = htons(::arg().asNum("local-port")); 
871
872    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
873    if (::bind(fd, (struct sockaddr *)&sin, socklen)<0) 
874      throw AhuException("Resolver binding to server socket on port "+::arg()["local-port"]+" for "+*i+": "+stringerror());
875   
876    Utility::setNonBlocking(fd);
877    //    g_fdm->addReadFD(fd, handleNewUDPQuestion);
878    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
879    g_tcpListenSockets.push_back(fd);
880    if(sin.sin4.sin_family == AF_INET) 
881      L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<::arg().asNum("local-port")<<endl;
882    else
883      L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<::arg().asNum("local-port")<<endl;
884  }
885}
886
887
888#ifndef WIN32
889void daemonize(void)
890{
891  if(fork())
892    exit(0); // bye bye
893 
894  setsid(); 
895
896  int i=open("/dev/null",O_RDWR); /* open stdin */
897  if(i < 0) 
898    L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
899  else {
900    dup2(i,0); /* stdin */
901    dup2(i,1); /* stderr */
902    dup2(i,2); /* stderr */
903    close(i);
904  }
905}
906#endif
907
908uint64_t counter;
909bool statsWanted;
910
911
912void usr1Handler(int)
913{
914  statsWanted=true;
915}
916
917
918
919void usr2Handler(int)
920{
921  SyncRes::setLog(true);
922  g_quiet=false;
923  ::arg().set("quiet")="no";
924
925}
926
927void doStats(void)
928{
929  if(g_stats.qcounter) {
930    L<<Logger::Error<<"stats: "<<g_stats.qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
931     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits"<<endl;
932    L<<Logger::Error<<"stats: throttle map: "<<SyncRes::s_throttle.size()<<", ns speeds: "
933     <<SyncRes::s_nsSpeeds.size()<<endl; // ", bytes: "<<RC.bytes()<<endl;
934    L<<Logger::Error<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
935    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
936     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
937    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
938  }
939  else if(statsWanted) 
940    L<<Logger::Error<<"stats: no stats yet!"<<endl;
941
942  statsWanted=false;
943}
944
945static void houseKeeping(void *)
946try
947{
948  static time_t last_stat, last_rootupdate, last_prune;
949  struct timeval now;
950  Utility::gettimeofday(&now, 0);
951
952  if(now.tv_sec - last_prune > 300) { 
953    DTime dt;
954    dt.setTimeval(now);
955    RC.doPrune();
956   
957    typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
958    negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(SyncRes::s_negcache);
959
960    negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
961    ttdindex.erase(ttdindex.begin(), i);
962
963    time_t limit=now.tv_sec-300;
964    for(SyncRes::nsspeeds_t::iterator i = SyncRes::s_nsSpeeds.begin() ; i!= SyncRes::s_nsSpeeds.end(); )
965      if(i->second.stale(limit))
966        SyncRes::s_nsSpeeds.erase(i++);
967      else
968        ++i;
969
970    //   cerr<<"Pruned "<<pruned<<" records, left "<<SyncRes::s_negcache.size()<<"\n";
971//    cout<<"Prune took "<<dt.udiff()<<"usec\n";
972    last_prune=time(0);
973  }
974  if(now.tv_sec - last_stat>1800) { 
975    doStats();
976    last_stat=time(0);
977  }
978  if(now.tv_sec - last_rootupdate > 7200) {
979    SyncRes sr(now);
980    vector<DNSResourceRecord> ret;
981
982    sr.setNoCache();
983    int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
984    if(!res) {
985      L<<Logger::Error<<"Refreshed . records"<<endl;
986      last_rootupdate=now.tv_sec;
987    }
988    else
989      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
990  }
991}
992catch(AhuException& ae)
993{
994  L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
995  throw;
996}
997;
998
999string questionExpand(const char* packet, uint16_t len, uint16_t& type)
1000{
1001  type=0;
1002  const char* end=packet+len;
1003  const char* pos=packet+12;
1004  unsigned char labellen;
1005  string ret;
1006  ret.reserve(len-12);
1007  while((labellen=*pos++)) {
1008    if(pos+labellen > end)
1009      break;
1010    ret.append(pos, labellen);
1011    ret.append(1,'.');
1012    pos+=labellen;
1013  }
1014  if(ret.empty())
1015    ret=".";
1016
1017  if(pos + labellen + 2 <= end)  // is this correct XXX FIXME?
1018    type=(*pos)*256 + *(pos+1);
1019   
1020  return ret;
1021}
1022
1023
1024void handleRCC(int fd, boost::any& var)
1025{
1026  string remote;
1027  string msg=s_rcc.recv(&remote);
1028  RecursorControlParser rcp;
1029  RecursorControlParser::func_t* command;
1030  string answer=rcp.getAnswer(msg, &command);
1031  try {
1032    s_rcc.send(answer, &remote);
1033    command();
1034  }
1035  catch(exception& e) {
1036    L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1037  }
1038  catch(AhuException& ae) {
1039    L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1040  }
1041}
1042
1043void handleTCPClientReadable(int fd, boost::any& var)
1044{
1045  PacketID* pident=any_cast<PacketID>(&var);
1046  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1047
1048  shared_array<char> buffer(new char[pident->inNeeded]);
1049
1050  int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1051  if(ret > 0) {
1052    pident->inMSG.append(&buffer[0], &buffer[ret]);
1053    pident->inNeeded-=ret;
1054    if(!pident->inNeeded) {
1055      //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1056      PacketID pid=*pident;
1057      string msg=pident->inMSG;
1058     
1059      g_fdm->removeReadFD(fd);
1060      MT->sendEvent(pid, &msg); 
1061    }
1062    else {
1063      //      cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1064    }
1065  }
1066  else {
1067    PacketID tmp=*pident;
1068    g_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1069    string empty;
1070    MT->sendEvent(tmp, &empty); // this conveys error status
1071  }
1072}
1073
1074void handleTCPClientWritable(int fd, boost::any& var)
1075{
1076  PacketID* pid=any_cast<PacketID>(&var);
1077 
1078  int ret=send(fd, pid->outMSG.c_str(), pid->outMSG.size() - pid->outPos,0);
1079  if(ret > 0) {
1080    pid->outPos+=ret;
1081    if(pid->outPos==pid->outMSG.size()) {
1082      PacketID tmp=*pid;
1083      g_fdm->removeWriteFD(fd);
1084      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1085    }
1086  }
1087  else {  // error or EOF
1088    PacketID tmp(*pid);
1089    g_fdm->removeWriteFD(fd);
1090    string sent;
1091    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1092  }
1093}
1094
1095// resend event to everybody chained onto it
1096void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1097{
1098  if(iter->key.chain.empty())
1099    return;
1100
1101  for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1102    resend.fd=-1;
1103    resend.id=*i;
1104    MT->sendEvent(resend, &content);
1105    g_stats.chainResends++;
1106    //    cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<": "<< res <<endl;
1107  }
1108}
1109
1110void handleUDPServerResponse(int fd, boost::any& var)
1111{
1112  PacketID pid=any_cast<PacketID>(var);
1113  int len;
1114  char data[1500];
1115  ComboAddress fromaddr;
1116  socklen_t addrlen=sizeof(fromaddr);
1117
1118  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1119
1120  if(len < (int)sizeof(dnsheader)) {
1121    if(len < 0)
1122      ; //      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1123    else {
1124      g_stats.serverParseError++; 
1125      if(g_logCommonErrors)
1126        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr) <<
1127          ": packet smalller than DNS header"<<endl;
1128    }
1129
1130    g_udpclientsocks.returnSocket(fd);
1131    string empty;
1132
1133    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1134    if(iter != MT->d_waiters.end()) 
1135      doResends(iter, pid, empty);
1136   
1137    MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1138    return;
1139  } 
1140
1141  dnsheader dh;
1142  memcpy(&dh, data, sizeof(dh));
1143 
1144  if(!dh.qdcount) // UPC, Nominum?
1145    return;
1146 
1147  if(dh.qr) {
1148    PacketID pident;
1149    pident.remote=fromaddr;
1150    pident.id=dh.id;
1151    pident.fd=fd;
1152    pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1153    string packet;
1154    packet.assign(data, len);
1155
1156    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1157    if(iter != MT->d_waiters.end()) {
1158      doResends(iter, pident, packet);
1159    }
1160
1161    if(!MT->sendEvent(pident, &packet)) {
1162      // if(g_logCommonErrors)
1163      //   L<<Logger::Warning<<"Discarding unexpected packet from "<<sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen)<<endl;
1164      g_stats.unexpectedCount++;
1165     
1166      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1167        if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
1168           !Utility::strcasecmp(pident.domain.c_str(), mthread->key.domain.c_str())) {
1169          mthread->key.nearMisses++;
1170        }
1171      }
1172    }
1173    else if(fd >= 0)
1174      g_udpclientsocks.returnSocket(fd);
1175  }
1176  else
1177    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr)  <<endl;
1178}
1179
1180FDMultiplexer* getMultiplexer()
1181{
1182  FDMultiplexer* ret;
1183  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1184      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1185    try {
1186      ret=i->second();
1187      L<<Logger::Error<<"Enabled '"<<ret->getName()<<"' multiplexer"<<endl;
1188      return ret;
1189    }
1190    catch(FDMultiplexerException &fe) {
1191      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1192    }
1193    catch(...) {
1194      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1195    }
1196  }
1197  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1198  exit(1);
1199}
1200
1201static void makeNameToIPZone(const string& hostname, const string& ip)
1202{
1203  SyncRes::AuthDomain ad;
1204  DNSResourceRecord rr;
1205  rr.qname=toCanonic("", hostname);
1206  rr.d_place=DNSResourceRecord::ANSWER;
1207  rr.ttl=86400;
1208  rr.qtype=QType::SOA;
1209  rr.content="localhost. root 1 604800 86400 2419200 604800";
1210 
1211  ad.d_records.insert(rr);
1212
1213  rr.qtype=QType::NS;
1214  rr.content="localhost.";
1215
1216  ad.d_records.insert(rr);
1217 
1218  rr.qtype=QType::A;
1219  rr.content=ip;
1220  ad.d_records.insert(rr);
1221 
1222  if(SyncRes::s_domainmap.count(rr.qname)) {
1223    L<<Logger::Warning<<"Hosts file will not overwrite zone '"<<rr.qname<<"' already loaded"<<endl;
1224  }
1225  else {
1226    L<<Logger::Warning<<"Inserting forward zone '"<<rr.qname<<"' based on hosts file"<<endl;
1227    SyncRes::s_domainmap[rr.qname]=ad;
1228  }
1229}
1230
1231//! parts[0] must be an IP address, the rest must be host names
1232static void makeIPToNamesZone(const vector<string>& parts) 
1233{
1234  string address=parts[0];
1235  vector<string> ipparts;
1236  stringtok(ipparts, address,".");
1237 
1238  SyncRes::AuthDomain ad;
1239  DNSResourceRecord rr;
1240  for(int n=ipparts.size()-1; n>=0 ; --n) {
1241    rr.qname.append(ipparts[n]);
1242    rr.qname.append(1,'.');
1243  }
1244  rr.qname.append("in-addr.arpa.");
1245
1246  rr.d_place=DNSResourceRecord::ANSWER;
1247  rr.ttl=86400;
1248  rr.qtype=QType::SOA;
1249  rr.content="localhost. root. 1 604800 86400 2419200 604800";
1250 
1251  ad.d_records.insert(rr);
1252
1253  rr.qtype=QType::NS;
1254  rr.content="localhost.";
1255
1256  ad.d_records.insert(rr);
1257  rr.qtype=QType::PTR;
1258
1259  if(ipparts.size()==4)  // otherwise this is a partial zone
1260    for(unsigned int n=1; n < parts.size(); ++n) {
1261      rr.content=toCanonic("", parts[n]);
1262      ad.d_records.insert(rr);
1263    }
1264
1265  if(SyncRes::s_domainmap.count(rr.qname)) {
1266    L<<Logger::Warning<<"Will not overwrite zone '"<<rr.qname<<"' already loaded"<<endl;
1267  }
1268  else {
1269    if(ipparts.size()==4)
1270      L<<Logger::Warning<<"Inserting reverse zone '"<<rr.qname<<"' based on hosts file"<<endl;
1271    SyncRes::s_domainmap[rr.qname]=ad;
1272  }
1273}
1274
1275
1276void parseAuthAndForwards();
1277
1278string reloadAuthAndForwards()
1279{
1280  SyncRes::domainmap_t original=SyncRes::s_domainmap;
1281 
1282  try {
1283    L<<Logger::Warning<<"Reloading zones, purging data from cache"<<endl;
1284 
1285    for(SyncRes::domainmap_t::const_iterator i = SyncRes::s_domainmap.begin(); i != SyncRes::s_domainmap.end(); ++i) {
1286      for(SyncRes::AuthDomain::records_t::const_iterator j = i->second.d_records.begin(); j != i->second.d_records.end(); ++j) 
1287        RC.doWipeCache(j->qname);
1288    }
1289
1290    string configname=::arg()["config-dir"]+"/recursor.conf";
1291    cleanSlashes(configname);
1292   
1293    if(!::arg().preParseFile(configname.c_str(), "forward-zones")) 
1294      L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1295   
1296    ::arg().preParseFile(configname.c_str(), "auth-zones");
1297    ::arg().preParseFile(configname.c_str(), "export-etc-hosts");
1298    ::arg().preParseFile(configname.c_str(), "serve-rfc1918");
1299   
1300    parseAuthAndForwards();
1301   
1302    // purge again - new zones need to blank out the cache
1303    for(SyncRes::domainmap_t::const_iterator i = SyncRes::s_domainmap.begin(); i != SyncRes::s_domainmap.end(); ++i) {
1304      for(SyncRes::AuthDomain::records_t::const_iterator j = i->second.d_records.begin(); j != i->second.d_records.end(); ++j) 
1305        RC.doWipeCache(j->qname);
1306    }
1307
1308    // this is pretty blunt
1309    SyncRes::s_negcache.clear(); 
1310    return "ok\n";
1311  }
1312  catch(exception& e) {
1313    L<<Logger::Error<<"Had error reloading zones, keeping original data: "<<e.what()<<endl;
1314  }
1315  catch(AhuException& ae) {
1316    L<<Logger::Error<<"Encountered error reloading zones, keeping original data: "<<ae.reason<<endl;
1317  }
1318  catch(...) {
1319    L<<Logger::Error<<"Encountered unknown error reloading zones, keeping original data"<<endl;
1320  }
1321  SyncRes::s_domainmap.swap(original);
1322  return "reloading failed, see log\n";
1323}
1324
1325void parseAuthAndForwards()
1326{
1327  SyncRes::s_domainmap.clear(); // this makes us idempotent
1328
1329  TXTRecordContent::report();
1330
1331  typedef vector<string> parts_t;
1332  parts_t parts; 
1333  for(int n=0; n < 2 ; ++n ) {
1334    parts.clear();
1335    stringtok(parts, ::arg()[n ? "forward-zones" : "auth-zones"], ",\t\n\r");
1336    for(parts_t::const_iterator iter = parts.begin(); iter != parts.end(); ++iter) {
1337      SyncRes::AuthDomain ad;
1338      pair<string,string> headers=splitField(*iter, '=');
1339      trim(headers.first);
1340      trim(headers.second);
1341      headers.first=toCanonic("", headers.first);
1342      if(n==0) {
1343        L<<Logger::Error<<"Parsing authoritative data for zone '"<<headers.first<<"' from file '"<<headers.second<<"'"<<endl;
1344        ZoneParserTNG zpt(headers.second, headers.first);
1345        DNSResourceRecord rr;
1346        while(zpt.get(rr)) {
1347          try {
1348            string tmp=DNSRR2String(rr);
1349            rr=String2DNSRR(rr.qname, rr.qtype, tmp, 3600);
1350          }
1351          catch(exception &e) {
1352            throw AhuException("Error parsing record '"+rr.qname+"' of type "+rr.qtype.getName()+" in zone '"+headers.first+"' from file '"+headers.second+"': "+e.what());
1353          }
1354          catch(...) {
1355            throw AhuException("Error parsing record '"+rr.qname+"' of type "+rr.qtype.getName()+" in zone '"+headers.first+"' from file '"+headers.second+"'");
1356          }
1357
1358          ad.d_records.insert(rr);
1359
1360        }
1361      }
1362      else {
1363        L<<Logger::Error<<"Redirecting queries for zone '"<<headers.first<<"' to IP '"<<headers.second<<"'"<<endl;
1364        ad.d_server=headers.second;
1365      }
1366     
1367      SyncRes::s_domainmap[headers.first]=ad;
1368    }
1369  }
1370 
1371  if(::arg().mustDo("export-etc-hosts")) {
1372    string line;
1373    string fname;
1374   
1375    ifstream ifs("/etc/hosts");
1376    if(!ifs) {
1377      L<<Logger::Warning<<"Could not open /etc/hosts for reading"<<endl;
1378      return;
1379    }
1380   
1381    string::size_type pos;
1382    while(getline(ifs,line)) {
1383      pos=line.find('#');
1384      if(pos!=string::npos)
1385        line.resize(pos);
1386      trim(line);
1387      if(line.empty())
1388        continue;
1389      parts.clear();
1390      stringtok(parts, line, "\t\r\n ");
1391      if(parts[0].find(':')!=string::npos)
1392        continue;
1393     
1394      for(unsigned int n=1; n < parts.size(); ++n)
1395        makeNameToIPZone(parts[n], parts[0]);
1396      makeIPToNamesZone(parts);
1397    }
1398  }
1399  if(::arg().mustDo("serve-rfc1918")) {
1400    L<<Logger::Warning<<"Inserting rfc 1918 private space zones"<<endl;
1401    parts.clear();
1402    parts.push_back("127");
1403    makeIPToNamesZone(parts);
1404    parts[0]="10";
1405    makeIPToNamesZone(parts);
1406
1407    parts[0]="192.168";
1408    makeIPToNamesZone(parts);
1409    for(int n=16; n < 32; n++) {
1410      parts[0]="172."+lexical_cast<string>(n);
1411      makeIPToNamesZone(parts);
1412    }
1413  }
1414}
1415
1416int serviceMain(int argc, char*argv[])
1417{
1418  L.setName("pdns_recursor");
1419  L.setLoglevel((Logger::Urgency)(4));
1420
1421  L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2006 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1422#ifdef __GNUC__
1423  L<<", gcc "__VERSION__;
1424#endif // add other compilers here
1425#ifdef _MSC_VER
1426  L<<", MSVC "<<_MSC_VER;
1427#endif
1428  L<<") starting up"<<endl;
1429 
1430  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1431    "This is free software, and you are welcome to redistribute it "
1432    "according to the terms of the GPL version 2."<<endl;
1433 
1434  L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1435 
1436  if(!::arg()["allow-from"].empty()) {
1437    g_allowFrom=new NetmaskGroup;
1438    vector<string> ips;
1439    stringtok(ips, ::arg()["allow-from"], ", ");
1440    L<<Logger::Warning<<"Only allowing queries from: ";
1441    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1442      g_allowFrom->addMask(*i);
1443      if(i!=ips.begin())
1444        L<<Logger::Warning<<", ";
1445      L<<Logger::Warning<<*i;
1446    }
1447    L<<Logger::Warning<<endl;
1448  }
1449  else if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
1450    L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1451 
1452  g_quiet=::arg().mustDo("quiet");
1453  if(::arg().mustDo("trace")) {
1454    SyncRes::setLog(true);
1455    ::arg().set("quiet")="no";
1456    g_quiet=false;
1457  }
1458
1459  RC.d_followRFC2181=::arg().mustDo("auth-can-lower-ttl");
1460 
1461  if(!::arg()["query-local-address6"].empty()) {
1462    SyncRes::s_doIPv6=true;
1463    L<<Logger::Error<<"Enabling IPv6 transport for outgoing queries"<<endl;
1464  }
1465 
1466  SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1467  SyncRes::s_serverID=::arg()["server-id"];
1468  if(SyncRes::s_serverID.empty()) {
1469    char tmp[128];
1470    gethostname(tmp, sizeof(tmp)-1);
1471    SyncRes::s_serverID=tmp;
1472  }
1473 
1474 
1475  parseAuthAndForwards();
1476 
1477  g_stats.remotes.resize(::arg().asNum("remotes-ringbuffer-entries"));
1478  if(!g_stats.remotes.empty())
1479    memset(&g_stats.remotes[0], 0, g_stats.remotes.size() * sizeof(RecursorStats::remotes_t::value_type));
1480  g_logCommonErrors=::arg().mustDo("log-common-errors");
1481 
1482  makeUDPServerSockets();
1483  makeTCPServerSockets();
1484 
1485#ifndef WIN32
1486  if(::arg().mustDo("fork")) {
1487    fork();
1488    L<<Logger::Warning<<"This is forked pid "<<getpid()<<endl;
1489  }
1490#endif
1491 
1492  MT=new MTasker<PacketID,string>(100000);
1493  makeControlChannelSocket();       
1494  PacketID pident;
1495  primeHints();   
1496  L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1497#ifndef WIN32
1498  if(::arg().mustDo("daemon")) {
1499    L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1500    L.toConsole(Logger::Critical);
1501    daemonize();
1502  }
1503  signal(SIGUSR1,usr1Handler);
1504  signal(SIGUSR2,usr2Handler);
1505  signal(SIGPIPE,SIG_IGN);
1506  writePid();
1507#endif
1508  g_fdm=getMultiplexer();
1509 
1510  for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
1511    g_fdm->addReadFD(i->first, i->second);
1512 
1513  int newgid=0;
1514  if(!::arg()["setgid"].empty())
1515    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1516  int newuid=0;
1517  if(!::arg()["setuid"].empty())
1518    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1519 
1520#ifndef WIN32
1521  if (!::arg()["chroot"].empty()) {
1522    if (chroot(::arg()["chroot"].c_str())<0) {
1523      L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1524      exit(1);
1525    }
1526  }
1527 
1528  Utility::dropPrivs(newuid, newgid);
1529  g_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1530#endif
1531 
1532  counter=0;
1533  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1534  g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1535 
1536  g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1537 
1538 
1539  bool listenOnTCP(true);
1540 
1541  for(;;) {
1542    while(MT->schedule()); // housekeeping, let threads do their thing
1543     
1544    if(!(counter%500)) {
1545      MT->makeThread(houseKeeping,0);
1546    }
1547
1548    if(!(counter%11)) {
1549      typedef vector<pair<int, boost::any> > expired_t;
1550      expired_t expired=g_fdm->getTimeouts(g_now);
1551       
1552      for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1553        TCPConnection conn=any_cast<TCPConnection>(i->second);
1554        if(conn.state != TCPConnection::DONE) {
1555          if(g_logCommonErrors)
1556            L<<Logger::Warning<<"Timeout from remote TCP client "<< conn.remote.toString() <<endl;
1557          g_fdm->removeReadFD(i->first);
1558          conn.closeAndCleanup();
1559        }
1560      }
1561    }
1562     
1563    counter++;
1564
1565    if(statsWanted) {
1566      doStats();
1567    }
1568
1569    Utility::gettimeofday(&g_now, 0);
1570    g_fdm->run(&g_now);
1571
1572    if(listenOnTCP) {
1573      if(TCPConnection::s_currentConnections > maxTcpClients) {  // shutdown
1574        for(g_tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1575          g_fdm->removeReadFD(*i);
1576        listenOnTCP=false;
1577      }
1578    }
1579    else {
1580      if(TCPConnection::s_currentConnections <= maxTcpClients) {  // reenable
1581        for(g_tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1582          g_fdm->addReadFD(*i, handleNewTCPQuestion);
1583        listenOnTCP=true;
1584      }
1585    }
1586  }
1587}
1588#ifdef WIN32
1589void doWindowsServiceArguments(RecursorService& recursor)
1590{
1591  if(::arg().mustDo( "register-service" )) {
1592    if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1593      cerr << "Could not register service." << endl;
1594      exit( 99 );
1595    }
1596   
1597    exit( 0 );
1598  }
1599
1600  if ( ::arg().mustDo( "unregister-service" )) {
1601    recursor.unregisterService();
1602    exit( 0 );
1603  }
1604}
1605#endif
1606
1607int main(int argc, char **argv) 
1608{
1609  reportBasicTypes();
1610
1611  int ret = EXIT_SUCCESS;
1612#ifdef WIN32
1613  RecursorService service;
1614  WSADATA wsaData;
1615  if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
1616    cerr<<"Unable to initialize winsock\n";
1617    exit(1);
1618  }
1619#endif // WIN32
1620
1621  try {
1622    Utility::srandom(time(0));
1623    ::arg().set("soa-minimum-ttl","Don't change")="0";
1624    ::arg().set("soa-serial-offset","Don't change")="0";
1625    ::arg().set("no-shuffle","Don't change")="off";
1626    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1627    ::arg().set("local-port","port to listen on")="53";
1628    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas")="127.0.0.1";
1629    ::arg().set("trace","if we should output heaps of logging")="off";
1630    ::arg().set("daemon","Operate as a daemon")="yes";
1631    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1632    ::arg().set("chroot","switch to chroot jail")="";
1633    ::arg().set("setgid","If set, change group id to this gid for more security")="";
1634    ::arg().set("setuid","If set, change user id to this uid for more security")="";
1635#ifdef WIN32
1636    ::arg().set("quiet","Suppress logging of questions and answers")="off";
1637    ::arg().setSwitch( "register-service", "Register the service" )= "no";
1638    ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
1639    ::arg().setSwitch( "ntservice", "Run as service" )= "no";
1640    ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes"; 
1641    ::arg().setSwitch( "use-logfile", "Use a log file" )= "no"; 
1642    ::arg().setSwitch( "logfile", "Filename of the log file" )= "recursor.log"; 
1643#else
1644    ::arg().set("quiet","Suppress logging of questions and answers")="";
1645#endif
1646    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1647    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1648    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1649    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1650    ::arg().set("query-local-address6","Source IPv6 address for sending queries")="";
1651    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1652    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1653    ::arg().set("hint-file", "If set, load root hints from this file")="";
1654    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="0";
1655    ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
1656    ::arg().set("server-id", "Returned when queried for 'server.id' TXT, defaults to hostname")="";
1657    ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
1658    ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
1659    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")="127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10";
1660    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
1661    ::arg().set("fork", "If set, fork the daemon for possible double performance")="no";
1662    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
1663    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
1664    ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
1665    ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
1666    ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
1667    ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
1668    ::arg().set("auth-can-lower-ttl", "If we follow RFC 2181 to the letter, an authoritative server can lower the TTL of NS records")="off";
1669
1670    ::arg().setCmd("help","Provide a helpful message");
1671    ::arg().setCmd("config","Output blank configuration");
1672    L.toConsole(Logger::Warning);
1673    ::arg().laxParse(argc,argv); // do a lax parse
1674
1675    string configname=::arg()["config-dir"]+"/recursor.conf";
1676    cleanSlashes(configname);
1677
1678    if(!::arg().file(configname.c_str())) 
1679      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
1680
1681    ::arg().parse(argc,argv);
1682
1683    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
1684
1685    if(::arg().mustDo("help")) {
1686      cerr<<"syntax:"<<endl<<endl;
1687      cerr<<::arg().helpstring(::arg()["help"])<<endl;
1688      exit(99);
1689    }
1690
1691    if(::arg().mustDo("config")) {
1692      cout<<::arg().configstring()<<endl;
1693      exit(0);
1694    }
1695
1696#ifndef WIN32
1697    serviceMain(argc, argv);
1698#else
1699    doWindowsServiceArguments(service);
1700        L.toNTLog();
1701    RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" )); 
1702#endif
1703
1704  }
1705  catch(AhuException &ae) {
1706    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1707    ret=EXIT_FAILURE;
1708  }
1709  catch(exception &e) {
1710    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1711    ret=EXIT_FAILURE;
1712  }
1713  catch(...) {
1714    L<<Logger::Error<<"any other exception in main: "<<endl;
1715    ret=EXIT_FAILURE;
1716  }
1717 
1718#ifdef WIN32
1719  WSACleanup();
1720#endif // WIN32
1721
1722  return ret;
1723}
Note: See TracBrowser for help on using the browser.