root/trunk/pdns/pdns/pdns_recursor.cc @ 855

Revision 855, 52.4 KB (checked in by ahu, 7 years ago)

slight speedup by testing the TCP sessions for timeouts less often, add 'current-queries' command which lists current outstanding queries

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2006  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#ifndef WIN32
20# include <netdb.h>
21# include <unistd.h>
22#else
23 #include "ntservice.hh"
24 #include "recursorservice.hh"
25#endif // WIN32
26
27#include "utility.hh"
28#include <iostream>
29#include <errno.h>
30#include <map>
31#include <set>
32#include "recursor_cache.hh"
33#include <stdio.h>
34#include <signal.h>
35#include <stdlib.h>
36
37#include "mtasker.hh"
38#include <utility>
39#include "arguments.hh"
40#include "syncres.hh"
41#include <fcntl.h>
42#include <fstream>
43#include "sstuff.hh"
44#include <boost/tuple/tuple.hpp>
45#include <boost/tuple/tuple_comparison.hpp>
46#include <boost/shared_array.hpp>
47#include <boost/lexical_cast.hpp>
48#include <boost/function.hpp>
49#include <boost/algorithm/string.hpp>
50#include "dnsparser.hh"
51#include "dnswriter.hh"
52#include "dnsrecords.hh"
53#include "zoneparser-tng.hh"
54#include "rec_channel.hh"
55#include "logger.hh"
56#include "iputils.hh"
57#include "mplexer.hh"
58#include "config.h"
59
60#ifndef RECURSOR
61#include "statbag.hh"
62StatBag S;
63#endif
64
65FDMultiplexer* g_fdm;
66unsigned int g_maxTCPPerClient;
67bool g_logCommonErrors;
68using namespace boost;
69
70#ifdef __FreeBSD__           // see cvstrac ticket #26
71#include <pthread.h>
72#include <semaphore.h>
73#endif
74
75MemRecursorCache RC;
76RecursorStats g_stats;
77bool g_quiet;
78NetmaskGroup* g_allowFrom;
79string s_programname="pdns_recursor";
80typedef vector<int> g_tcpListenSockets_t;
81g_tcpListenSockets_t g_tcpListenSockets;
82int g_tcpTimeout;
83
84struct DNSComboWriter {
85  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), d_tcp(false), d_socket(-1)
86  {}
87  MOADNSParser d_mdp;
88  void setRemote(ComboAddress* sa)
89  {
90    d_remote=*sa;
91  }
92
93  void setSocket(int sock)
94  {
95    d_socket=sock;
96  }
97
98  string getRemote() const
99  {
100    return d_remote.toString();
101  }
102
103  struct timeval d_now;
104  ComboAddress d_remote;
105  bool d_tcp;
106  int d_socket;
107};
108
109
110#ifndef WIN32
111#ifndef __FreeBSD__
112extern "C" {
113  int sem_init(sem_t*, int, unsigned int){return 0;}
114  int sem_wait(sem_t*){return 0;}
115  int sem_trywait(sem_t*){return 0;}
116  int sem_post(sem_t*){return 0;}
117  int sem_getvalue(sem_t*, int*){return 0;}
118  pthread_t pthread_self(void){return (pthread_t) 0;}
119  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
120  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
121  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
122  int pthread_mutex_destroy(pthread_mutex_t *mutex) { return 0; }
123}
124#endif // __FreeBSD__
125#endif // WIN32
126
127ArgvMap &arg()
128{
129  static ArgvMap theArg;
130  return theArg;
131}
132
133struct timeval g_now;
134typedef vector<int> tcpserversocks_t;
135
136MT_t* MT; // the big MTasker
137
138void handleTCPClientWritable(int fd, boost::any& var);
139
140// -1 is error, 0 is timeout, 1 is success
141int asendtcp(const string& data, Socket* sock) 
142{
143  PacketID pident;
144  pident.sock=sock;
145  pident.outMSG=data;
146
147  g_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
148  string packet;
149
150  int ret=MT->waitEvent(pident,&packet,1);
151  if(!ret || ret==-1) { // timeout
152    g_fdm->removeWriteFD(sock->getHandle());
153  }
154  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
155    return -1;
156  }
157  return ret;
158}
159
160void handleTCPClientReadable(int fd, boost::any& var);
161
162// -1 is error, 0 is timeout, 1 is success
163int arecvtcp(string& data, int len, Socket* sock) 
164{
165  data.clear();
166  PacketID pident;
167  pident.sock=sock;
168  pident.inNeeded=len;
169  g_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
170
171  int ret=MT->waitEvent(pident,&data,1);
172  if(!ret || ret==-1) { // timeout
173    g_fdm->removeReadFD(sock->getHandle());
174  }
175  else if(data.empty()) {// error, EOF or other
176    return -1;
177  }
178
179  return ret;
180}
181
182// returns -1 for errors which might go away, throws for ones that won't
183int makeClientSocket(int family)
184{
185  int ret=(int)socket(family, SOCK_DGRAM, 0);
186  if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
187    return ret;
188
189  if(ret<0) 
190    throw AhuException("Making a socket for resolver: "+stringerror());
191
192  static optional<ComboAddress> sin4;
193  if(!sin4) {
194    sin4=ComboAddress(::arg()["query-local-address"]);
195  }
196  static optional<ComboAddress> sin6;
197  if(!sin6) {
198    if(!::arg()["query-local-address6"].empty())
199    sin6=ComboAddress(::arg()["query-local-address6"]);
200  }
201
202  int tries=10;
203  while(--tries) {
204    uint16_t port=1025+Utility::random()%64510;
205    if(tries==1)  // fall back to kernel 'random'
206        port=0;
207
208    if(family==AF_INET) {
209      sin4->sin4.sin_port = htons(port); 
210     
211      if (::bind(ret, (struct sockaddr *)&*sin4, sin4->getSocklen()) >= 0) 
212        break;
213    }
214    else {
215      sin6->sin6.sin6_port = htons(port); 
216     
217      if (::bind(ret, (struct sockaddr *)&*sin6, sin6->getSocklen()) >= 0) 
218        break;
219    }
220  }
221  if(!tries)
222    throw AhuException("Resolver binding to local query client socket: "+stringerror());
223
224  Utility::setNonBlocking(ret);
225  return ret;
226}
227
228void handleUDPServerResponse(int fd, boost::any&);
229
230// you can ask this class for a UDP socket to send a query from
231// this socket is not yours, don't even think about deleting it
232// but after you call 'returnSocket' on it, don't assume anything anymore
233class UDPClientSocks
234{
235  unsigned int d_numsocks;
236  unsigned int d_maxsocks;
237
238public:
239  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
240  {
241  }
242
243  typedef set<int> socks_t;
244  socks_t d_socks;
245
246  // returning -1 means: temporary OS error (ie, out of files)
247  int getSocket(uint16_t family)
248  {
249    int fd=makeClientSocket(family);
250    if(fd < 0) // temporary error - receive exception otherwise
251      return -1;
252
253    d_socks.insert(fd);
254    d_numsocks++;
255    return fd;
256  }
257
258  void returnSocket(int fd)
259  {
260    socks_t::iterator i=d_socks.find(fd);
261    if(i==d_socks.end()) {
262      throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
263    }
264    returnSocket(i);
265  }
266
267  // return a socket to the pool, or simply erase it
268  void returnSocket(socks_t::iterator& i)
269  {
270    if(i==d_socks.end()) {
271      throw AhuException("Trying to return a socket not in the pool");
272    }
273    g_fdm->removeReadFD(*i);
274    Utility::closesocket(*i);
275   
276    d_socks.erase(i++);
277    --d_numsocks;
278  }
279}g_udpclientsocks;
280
281
282/* these two functions are used by LWRes */
283// -2 is OS error, -1 is error that depends on the remote, > 0 is success
284int asendto(const char *data, int len, int flags, 
285            const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd) 
286{
287
288  PacketID pident;
289  pident.domain = domain;
290  pident.remote = toaddr;
291  pident.type = qtype;
292
293  // see if there is an existing outstanding request we can chain on to, using partial equivalence function
294  pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
295
296  for(; chain.first != chain.second; chain.first++) {
297    if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
298      //      cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
299      // cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
300      // <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
301     
302      chain.first->key.chain.insert(id); // we can chain
303      *fd=-1;                            // gets used in waitEvent / sendEvent later on
304      return 1;
305    }
306  }
307
308  *fd=g_udpclientsocks.getSocket(toaddr.sin4.sin_family);
309  if(*fd < 0)
310    return -2;
311
312  pident.fd=*fd;
313  pident.id=id;
314 
315  int ret=connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen());
316  if(ret < 0) {
317    if(errno==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
318      return -2;
319    return ret;
320  }
321
322  g_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
323  return send(*fd, data, len, 0);
324}
325
326// -1 is error, 0 is timeout, 1 is success
327int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len, 
328              uint16_t id, const string& domain, uint16_t qtype, int fd, unsigned int now)
329{
330  static optional<unsigned int> nearMissLimit;
331  if(!nearMissLimit) 
332    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
333
334  PacketID pident;
335  pident.fd=fd;
336  pident.id=id;
337  pident.domain=domain;
338  pident.type = qtype;
339  pident.remote=fromaddr;
340
341  string packet;
342  int ret=MT->waitEvent(pident, &packet, 1, now);
343
344  if(ret > 0) {
345    if(packet.empty()) // means "error"
346      return -1; 
347
348    *d_len=(int)packet.size();
349    memcpy(data,packet.c_str(),min(len,*d_len));
350    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
351      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
352      g_stats.spoofCount++;
353      return -1;
354    }
355  }
356  else {
357    if(fd >= 0)
358      g_udpclientsocks.returnSocket(fd);
359  }
360  return ret;
361}
362
363void setBuffer(int fd, int optname, uint32_t size)
364{
365  uint32_t psize=0;
366  socklen_t len=sizeof(psize);
367 
368  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
369    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
370    return; 
371  }
372
373  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
374    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
375}
376
377
378static void setReceiveBuffer(int fd, uint32_t size)
379{
380  setBuffer(fd, SO_RCVBUF, size);
381}
382
383static void setSendBuffer(int fd, uint32_t size)
384{
385  setBuffer(fd, SO_SNDBUF, size);
386}
387
388static void writePid(void)
389{
390  string fname=::arg()["socket-dir"]+"/"+s_programname+".pid";
391  ofstream of(fname.c_str());
392  if(of)
393    of<< Utility::getpid() <<endl;
394  else
395    L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
396}
397
398void primeHints(void)
399{
400  // prime root cache
401  set<DNSResourceRecord>nsset;
402
403  if(::arg()["hint-file"].empty()) {
404    static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
405                       "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
406    DNSResourceRecord arr, nsrr;
407    arr.qtype=QType::A;
408    arr.ttl=time(0)+3600000;
409    nsrr.qtype=QType::NS;
410    nsrr.ttl=time(0)+3600000;
411   
412    for(char c='a';c<='m';++c) {
413      static char templ[40];
414      strncpy(templ,"a.root-servers.net.", sizeof(templ) - 1);
415      *templ=c;
416      arr.qname=nsrr.content=templ;
417      arr.content=ips[c-'a'];
418      set<DNSResourceRecord> aset;
419      aset.insert(arr);
420      RC.replace(time(0), string(templ), QType(QType::A), aset, true); // auth, nuke it all
421     
422      nsset.insert(nsrr);
423    }
424  }
425  else {
426    ZoneParserTNG zpt(::arg()["hint-file"]);
427    DNSResourceRecord rr;
428    set<DNSResourceRecord> aset;
429
430    while(zpt.get(rr)) {
431      rr.ttl+=time(0);
432      if(rr.qtype.getCode()==QType::A) {
433        set<DNSResourceRecord> aset;
434        aset.insert(rr);
435        RC.replace(time(0), rr.qname, QType(QType::A), aset, true); // auth, etc see above
436      }
437      if(rr.qtype.getCode()==QType::NS) {
438        rr.content=toLower(rr.content);
439        nsset.insert(rr);
440      }
441    }
442  }
443  RC.replace(time(0),".", QType(QType::NS), nsset, true); // and stuff in the cache (auth)
444}
445
446map<ComboAddress, uint32_t> g_tcpClientCounts;
447
448struct TCPConnection
449{
450  int fd;
451  enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state;
452  int qlen;
453  int bytesread;
454  ComboAddress remote;
455  char data[65535];
456  time_t startTime;
457
458  void closeAndCleanup()
459  {
460    Utility::closesocket(fd);
461    if(!g_tcpClientCounts[remote]--) 
462      g_tcpClientCounts.erase(remote);
463    s_currentConnections--;
464  }
465  static unsigned int s_currentConnections; //!< total number of current TCP connections
466};
467
468unsigned int TCPConnection::s_currentConnections; 
469
470void startDoResolve(void *p)
471{
472  DNSComboWriter* dc=(DNSComboWriter *)p;
473  try {
474    uint16_t maxudpsize=512;
475    MOADNSParser::EDNSOpts edo;
476    if(dc->d_mdp.getEDNSOpts(&edo)) {
477      maxudpsize=edo.d_packetsize;
478    }
479   
480    vector<DNSResourceRecord> ret;
481   
482    vector<uint8_t> packet;
483    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
484
485    pw.getHeader()->aa=0;
486    pw.getHeader()->ra=1;
487    pw.getHeader()->qr=1;
488    pw.getHeader()->id=dc->d_mdp.d_header.id;
489    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
490
491    SyncRes sr(dc->d_now);
492    if(!g_quiet)
493      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
494       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
495
496    sr.setId(MT->getTid());
497    if(!dc->d_mdp.d_header.rd)
498      sr.setCacheOnly();
499
500    int res=sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
501    if(res<0) {
502      pw.getHeader()->rcode=RCode::ServFail;
503      // no commit here, because no record
504      g_stats.servFails++;
505    }
506    else {
507      pw.getHeader()->rcode=res;
508      switch(res) {
509      case RCode::ServFail:
510        g_stats.servFails++;
511        break;
512      case RCode::NXDomain:
513        g_stats.nxDomains++;
514        break;
515      case RCode::NoError:
516        g_stats.noErrors++;
517        break;
518      }
519     
520      if(ret.size()) {
521        shuffle(ret);
522        for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i) {
523          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place);
524          shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content));
525         
526          drc->toPacket(pw);
527       
528          if(!dc->d_tcp && pw.size() > maxudpsize) {
529            pw.rollback();
530            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
531              pw.getHeader()->tc=1;
532            goto sendit; // need to jump over pw.commit
533          }
534        }
535        pw.commit();
536      }
537    }
538  sendit:;
539    if(!dc->d_tcp) {
540      sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
541    }
542    else {
543      char buf[2];
544      buf[0]=packet.size()/256;
545      buf[1]=packet.size()%256;
546
547      Utility::iovec iov[2];
548
549      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
550      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
551
552      int ret=Utility::writev(dc->d_socket, iov, 2);
553      bool hadError=true;
554
555      if(ret == 0) 
556        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
557      else if(ret < 0 ) 
558        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
559      else if((unsigned int)ret != 2 + packet.size())
560        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
561      else
562        hadError=false;
563     
564      // update tcp connection status, either by closing or moving to 'BYTE0'
565
566      if(hadError) {
567        g_fdm->removeReadFD(dc->d_socket);
568        Utility::closesocket(dc->d_socket);
569      }
570      else {
571        any_cast<TCPConnection>(&g_fdm->getReadParameter(dc->d_socket))->state=TCPConnection::BYTE0;
572        struct timeval now; 
573        Utility::gettimeofday(&now, 0); // needs to be updated
574        g_fdm->setReadTTD(dc->d_socket, now, g_tcpTimeout);
575      }
576    }
577
578    if(!g_quiet) {
579      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
580      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
581        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
582    }
583   
584    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
585    float spent=makeFloat(sr.d_now-dc->d_now);
586    if(spent < 0.001)
587      g_stats.answers0_1++;
588    else if(spent < 0.010)
589      g_stats.answers1_10++;
590    else if(spent < 0.1)
591      g_stats.answers10_100++;
592    else if(spent < 1.0)
593      g_stats.answers100_1000++;
594    else
595      g_stats.answersSlow++;
596
597    uint64_t newLat=(uint64_t)(spent*1000000);
598    if(newLat < 1000000)  // outliers of several minutes exist..
599      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
600    delete dc;
601  }
602  catch(AhuException &ae) {
603    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
604  }
605  catch(MOADNSException& e) {
606    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
607  }
608  catch(exception& e) {
609    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
610  }
611  catch(...) {
612    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
613  }
614}
615
616RecursorControlChannel s_rcc;
617
618void makeControlChannelSocket()
619{
620  string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
621  if(::arg().mustDo("fork")) {
622    sockname+="."+lexical_cast<string>(Utility::getpid());
623    L<<Logger::Warning<<"Forked control socket name: "<<sockname<<endl;
624  }
625  s_rcc.listen(sockname);
626}
627
628void handleRunningTCPQuestion(int fd, boost::any& var)
629{
630  TCPConnection* conn=any_cast<TCPConnection>(&var);
631  if(conn->state==TCPConnection::BYTE0) {
632
633    int bytes=recv(conn->fd, conn->data, 2, 0);
634    if(bytes==1)
635      conn->state=TCPConnection::BYTE1;
636    if(bytes==2) { 
637      conn->qlen=(conn->data[0]<<8)+conn->data[1];
638      conn->bytesread=0;
639      conn->state=TCPConnection::GETQUESTION;
640    }
641    if(!bytes || bytes < 0) {
642      TCPConnection tmp(*conn); 
643      g_fdm->removeReadFD(fd);
644      tmp.closeAndCleanup();
645      return;
646    }
647  }
648  else if(conn->state==TCPConnection::BYTE1) {
649    int bytes=recv(conn->fd,conn->data+1,1,0);
650    if(bytes==1) {
651      conn->state=TCPConnection::GETQUESTION;
652      conn->qlen=(conn->data[0]<<8)+conn->data[1];
653      conn->bytesread=0;
654    }
655    if(!bytes || bytes < 0) {
656      if(g_logCommonErrors)
657        L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected after first byte"<<endl;
658      TCPConnection tmp(*conn); 
659      g_fdm->removeReadFD(fd);
660      tmp.closeAndCleanup();  // conn loses validity here..
661      return;
662    }
663  }
664  else if(conn->state==TCPConnection::GETQUESTION) {
665    int bytes=recv(conn->fd,conn->data + conn->bytesread,conn->qlen - conn->bytesread, 0);
666    if(!bytes || bytes < 0) {
667      L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected while reading question body"<<endl;
668      TCPConnection tmp(*conn);
669      g_fdm->removeReadFD(fd);
670      tmp.closeAndCleanup();  // conn loses validity here..
671
672      return;
673    }
674    conn->bytesread+=bytes;
675    if(conn->bytesread==conn->qlen) {
676      conn->state=TCPConnection::DONE;        // this makes us immune from timeouts, from now on *we* are responsible
677      DNSComboWriter* dc=0;
678      try {
679        dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
680      }
681      catch(MOADNSException &mde) {
682        g_stats.clientParseError++; 
683        L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->remote.toString() <<endl;
684        TCPConnection tmp(*conn); 
685        g_fdm->removeReadFD(fd);
686        tmp.closeAndCleanup();
687        return;
688      }
689     
690      dc->setSocket(conn->fd);
691      dc->d_tcp=true;
692      dc->setRemote(&conn->remote);
693      if(dc->d_mdp.d_header.qr)
694        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
695      else {
696        ++g_stats.qcounter;
697        ++g_stats.tcpqcounter;
698        MT->makeThread(startDoResolve, dc);
699        return;
700      }
701    }
702  }
703}
704
705//! Handle new incoming TCP connection
706void handleNewTCPQuestion(int fd, boost::any& )
707{
708  ComboAddress addr;
709  socklen_t addrlen=sizeof(addr);
710  int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
711  if(newsock>0) {
712    g_stats.addRemote(addr);
713    if(g_allowFrom && !g_allowFrom->match(&addr)) {
714      if(!g_quiet) 
715        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
716
717      g_stats.unauthorizedTCP++;
718      Utility::closesocket(newsock);
719      return;
720    }
721   
722    if(g_maxTCPPerClient && g_tcpClientCounts.count(addr) && g_tcpClientCounts[addr] >= g_maxTCPPerClient) {
723      g_stats.tcpClientOverflow++;
724      Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
725      return;
726    }
727    g_tcpClientCounts[addr]++;
728    Utility::setNonBlocking(newsock);
729    TCPConnection tc;
730    tc.fd=newsock;
731    tc.state=TCPConnection::BYTE0;
732    tc.remote=addr;
733    tc.startTime=g_now.tv_sec;
734    TCPConnection::s_currentConnections++;
735    g_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
736
737    struct timeval now;
738    Utility::gettimeofday(&now, 0);
739    g_fdm->setReadTTD(tc.fd, now, g_tcpTimeout);
740  }
741}
742 
743void handleNewUDPQuestion(int fd, boost::any& var)
744{
745  int len;
746  char data[1500];
747  ComboAddress fromaddr;
748  socklen_t addrlen=sizeof(fromaddr);
749
750  if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
751    g_stats.addRemote(fromaddr);
752    if(g_allowFrom && !g_allowFrom->match(&fromaddr)) {
753      if(!g_quiet) 
754        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
755
756      g_stats.unauthorizedUDP++;
757      return;
758    }
759    try {
760      DNSComboWriter* dc = new DNSComboWriter(data, len, g_now);
761      dc->setRemote(&fromaddr);
762     
763      if(dc->d_mdp.d_header.qr) {
764        if(g_logCommonErrors)
765          L<<Logger::Error<<"Ignoring answer from "<<dc->getRemote()<<" on server socket!"<<endl;
766      }
767      else {
768        ++g_stats.qcounter;
769        dc->setSocket(fd);
770        dc->d_tcp=false;
771        MT->makeThread(startDoResolve, (void*) dc);
772      }
773    }
774    catch(MOADNSException& mde) {
775      g_stats.clientParseError++; 
776      L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
777    }
778  }
779}
780
781typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
782deferredAdd_t deferredAdd;
783
784void makeTCPServerSockets()
785{
786  int fd;
787  vector<string>locals;
788  stringtok(locals,::arg()["local-address"]," ,");
789
790  if(locals.empty())
791    throw AhuException("No local address specified");
792 
793  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
794    ServiceTuple st;
795    st.port=::arg().asNum("local-port");
796    parseService(*i, st);
797   
798    ComboAddress sin;
799
800    memset((char *)&sin,0, sizeof(sin));
801    sin.sin4.sin_family = AF_INET;
802    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
803      sin.sin6.sin6_family = AF_INET6;
804      if(Utility::inet_pton(AF_INET6, st.host.c_str(), &sin.sin6.sin6_addr) <= 0)
805        throw AhuException("Unable to resolve local address for TCP server on '"+ st.host +"'"); 
806    }
807
808    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
809    if(fd<0) 
810      throw AhuException("Making a TCP server socket for resolver: "+stringerror());
811
812    int tmp=1;
813    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
814      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
815      exit(1);
816    }
817   
818#ifdef TCP_DEFER_ACCEPT
819    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
820      if(i==locals.begin())
821        L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
822    }
823#endif
824
825    sin.sin4.sin_port = htons(st.port);
826    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
827    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0) 
828      throw AhuException("Binding TCP server socket for "+ st.host +": "+stringerror());
829   
830    Utility::setNonBlocking(fd);
831    setSendBuffer(fd, 65000);
832    listen(fd, 128);
833    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
834    g_tcpListenSockets.push_back(fd);
835
836    if(sin.sin4.sin_family == AF_INET) 
837      L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
838    else
839      L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
840  }
841}
842
843void makeUDPServerSockets()
844{
845  vector<string>locals;
846  stringtok(locals,::arg()["local-address"]," ,");
847
848  if(locals.empty())
849    throw AhuException("No local address specified");
850 
851  if(::arg()["local-address"]=="0.0.0.0") {
852    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
853  }
854
855  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
856    ServiceTuple st;
857    st.port=::arg().asNum("local-port");
858    parseService(*i, st);
859
860    ComboAddress sin;
861
862    memset(&sin, 0, sizeof(sin));
863    sin.sin4.sin_family = AF_INET;
864    if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
865      sin.sin6.sin6_family = AF_INET6;
866      if(Utility::inet_pton(AF_INET6, st.host.c_str(), &sin.sin6.sin6_addr) <= 0)
867        throw AhuException("Unable to resolve local address for UDP server on '"+ st.host +"'"); 
868    }
869   
870    int fd=socket(sin.sin4.sin_family, SOCK_DGRAM,0);
871    if(fd < 0) {
872      throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
873    }
874
875    setReceiveBuffer(fd, 200000);
876    sin.sin4.sin_port = htons(st.port);
877
878    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
879    if (::bind(fd, (struct sockaddr *)&sin, socklen)<0) 
880      throw AhuException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
881   
882    Utility::setNonBlocking(fd);
883    //    g_fdm->addReadFD(fd, handleNewUDPQuestion);
884    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
885
886    if(sin.sin4.sin_family == AF_INET) 
887      L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
888    else
889      L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
890  }
891}
892
893
894#ifndef WIN32
895void daemonize(void)
896{
897  if(fork())
898    exit(0); // bye bye
899 
900  setsid(); 
901
902  int i=open("/dev/null",O_RDWR); /* open stdin */
903  if(i < 0) 
904    L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
905  else {
906    dup2(i,0); /* stdin */
907    dup2(i,1); /* stderr */
908    dup2(i,2); /* stderr */
909    close(i);
910  }
911}
912#endif
913
914uint64_t counter;
915bool statsWanted;
916
917
918void usr1Handler(int)
919{
920  statsWanted=true;
921}
922
923
924
925void usr2Handler(int)
926{
927  SyncRes::setLog(true);
928  g_quiet=false;
929  ::arg().set("quiet")="no";
930
931}
932
933void doStats(void)
934{
935  if(g_stats.qcounter && (RC.cacheHits + RC.cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {
936    L<<Logger::Error<<"stats: "<<g_stats.qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
937     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits"<<endl;
938    L<<Logger::Error<<"stats: throttle map: "<<SyncRes::s_throttle.size()<<", ns speeds: "
939     <<SyncRes::s_nsSpeeds.size()<<endl; // ", bytes: "<<RC.bytes()<<endl;
940    L<<Logger::Error<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
941    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
942     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
943    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
944  }
945  else if(statsWanted) 
946    L<<Logger::Error<<"stats: no stats yet!"<<endl;
947
948  statsWanted=false;
949}
950
951static void houseKeeping(void *)
952try
953{
954  static time_t last_stat, last_rootupdate, last_prune;
955  struct timeval now;
956  Utility::gettimeofday(&now, 0);
957
958  if(now.tv_sec - last_prune > 300) { 
959    DTime dt;
960    dt.setTimeval(now);
961    RC.doPrune();
962   
963    typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
964    negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(SyncRes::s_negcache);
965
966    negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
967    ttdindex.erase(ttdindex.begin(), i);
968
969    time_t limit=now.tv_sec-300;
970    for(SyncRes::nsspeeds_t::iterator i = SyncRes::s_nsSpeeds.begin() ; i!= SyncRes::s_nsSpeeds.end(); )
971      if(i->second.stale(limit))
972        SyncRes::s_nsSpeeds.erase(i++);
973      else
974        ++i;
975
976    //   cerr<<"Pruned "<<pruned<<" records, left "<<SyncRes::s_negcache.size()<<"\n";
977//    cout<<"Prune took "<<dt.udiff()<<"usec\n";
978    last_prune=time(0);
979  }
980  if(now.tv_sec - last_stat>1800) { 
981    doStats();
982    last_stat=time(0);
983  }
984  if(now.tv_sec - last_rootupdate > 7200) {
985    SyncRes sr(now);
986    vector<DNSResourceRecord> ret;
987
988    sr.setNoCache();
989    int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
990    if(!res) {
991      L<<Logger::Error<<"Refreshed . records"<<endl;
992      last_rootupdate=now.tv_sec;
993    }
994    else
995      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
996  }
997}
998catch(AhuException& ae)
999{
1000  L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1001  throw;
1002}
1003;
1004
1005string questionExpand(const char* packet, uint16_t len, uint16_t& type)
1006{
1007  type=0;
1008  const unsigned char* end=(const unsigned char*)packet+len;
1009  const unsigned char* pos=(const unsigned char*)packet+12;
1010  unsigned char labellen;
1011  string ret;
1012  ret.reserve(len-12);
1013  while((labellen=*pos++)) {
1014    if(pos+labellen > end)
1015      break;
1016    ret.append((const char*)pos, labellen);
1017    ret.append(1,'.');
1018    pos+=labellen;
1019  }
1020  if(ret.empty())
1021    ret=".";
1022
1023  if(pos + labellen + 2 <= end)  // is this correct XXX FIXME?
1024    type=(*pos)*256 + *(pos+1);
1025   
1026  return ret;
1027}
1028
1029
1030void handleRCC(int fd, boost::any& var)
1031{
1032  string remote;
1033  string msg=s_rcc.recv(&remote);
1034  RecursorControlParser rcp;
1035  RecursorControlParser::func_t* command;
1036  string answer=rcp.getAnswer(msg, &command);
1037  try {
1038    s_rcc.send(answer, &remote);
1039    command();
1040  }
1041  catch(exception& e) {
1042    L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1043  }
1044  catch(AhuException& ae) {
1045    L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1046  }
1047}
1048
1049void handleTCPClientReadable(int fd, boost::any& var)
1050{
1051  PacketID* pident=any_cast<PacketID>(&var);
1052  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1053
1054  shared_array<char> buffer(new char[pident->inNeeded]);
1055
1056  int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1057  if(ret > 0) {
1058    pident->inMSG.append(&buffer[0], &buffer[ret]);
1059    pident->inNeeded-=ret;
1060    if(!pident->inNeeded) {
1061      //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1062      PacketID pid=*pident;
1063      string msg=pident->inMSG;
1064     
1065      g_fdm->removeReadFD(fd);
1066      MT->sendEvent(pid, &msg); 
1067    }
1068    else {
1069      //      cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1070    }
1071  }
1072  else {
1073    PacketID tmp=*pident;
1074    g_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1075    string empty;
1076    MT->sendEvent(tmp, &empty); // this conveys error status
1077  }
1078}
1079
1080void handleTCPClientWritable(int fd, boost::any& var)
1081{
1082  PacketID* pid=any_cast<PacketID>(&var);
1083 
1084  int ret=send(fd, pid->outMSG.c_str(), pid->outMSG.size() - pid->outPos,0);
1085  if(ret > 0) {
1086    pid->outPos+=ret;
1087    if(pid->outPos==pid->outMSG.size()) {
1088      PacketID tmp=*pid;
1089      g_fdm->removeWriteFD(fd);
1090      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1091    }
1092  }
1093  else {  // error or EOF
1094    PacketID tmp(*pid);
1095    g_fdm->removeWriteFD(fd);
1096    string sent;
1097    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1098  }
1099}
1100
1101// resend event to everybody chained onto it
1102void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1103{
1104  if(iter->key.chain.empty())
1105    return;
1106
1107  for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1108    resend.fd=-1;
1109    resend.id=*i;
1110    MT->sendEvent(resend, &content);
1111    g_stats.chainResends++;
1112    //    cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<": "<< res <<endl;
1113  }
1114}
1115
1116void handleUDPServerResponse(int fd, boost::any& var)
1117{
1118  PacketID pid=any_cast<PacketID>(var);
1119  int len;
1120  char data[1500];
1121  ComboAddress fromaddr;
1122  socklen_t addrlen=sizeof(fromaddr);
1123
1124  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1125
1126  if(len < (int)sizeof(dnsheader)) {
1127    if(len < 0)
1128      ; //      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1129    else {
1130      g_stats.serverParseError++; 
1131      if(g_logCommonErrors)
1132        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr) <<
1133          ": packet smalller than DNS header"<<endl;
1134    }
1135
1136    g_udpclientsocks.returnSocket(fd);
1137    string empty;
1138
1139    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1140    if(iter != MT->d_waiters.end()) 
1141      doResends(iter, pid, empty);
1142   
1143    MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1144    return;
1145  } 
1146
1147  dnsheader dh;
1148  memcpy(&dh, data, sizeof(dh));
1149 
1150  if(!dh.qdcount) // UPC, Nominum?
1151    return;
1152 
1153  if(dh.qr) {
1154    PacketID pident;
1155    pident.remote=fromaddr;
1156    pident.id=dh.id;
1157    pident.fd=fd;
1158    pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1159    string packet;
1160    packet.assign(data, len);
1161
1162    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1163    if(iter != MT->d_waiters.end()) {
1164      doResends(iter, pident, packet);
1165    }
1166
1167    if(!MT->sendEvent(pident, &packet)) {
1168//      if(g_logCommonErrors)
1169//        L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toString()<<": "<<pident.type<<endl;
1170      g_stats.unexpectedCount++;
1171     
1172      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1173        if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
1174           !Utility::strcasecmp(pident.domain.c_str(), mthread->key.domain.c_str())) {
1175          mthread->key.nearMisses++;
1176        }
1177      }
1178    }
1179    else if(fd >= 0)
1180      g_udpclientsocks.returnSocket(fd);
1181  }
1182  else
1183    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr)  <<endl;
1184}
1185
1186FDMultiplexer* getMultiplexer()
1187{
1188  FDMultiplexer* ret;
1189  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1190      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1191    try {
1192      ret=i->second();
1193      L<<Logger::Error<<"Enabled '"<<ret->getName()<<"' multiplexer"<<endl;
1194      return ret;
1195    }
1196    catch(FDMultiplexerException &fe) {
1197      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1198    }
1199    catch(...) {
1200      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1201    }
1202  }
1203  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1204  exit(1);
1205}
1206
1207static void makeNameToIPZone(const string& hostname, const string& ip)
1208{
1209  SyncRes::AuthDomain ad;
1210  DNSResourceRecord rr;
1211  rr.qname=toCanonic("", hostname);
1212  rr.d_place=DNSResourceRecord::ANSWER;
1213  rr.ttl=86400;
1214  rr.qtype=QType::SOA;
1215  rr.content="localhost. root 1 604800 86400 2419200 604800";
1216 
1217  ad.d_records.insert(rr);
1218
1219  rr.qtype=QType::NS;
1220  rr.content="localhost.";
1221
1222  ad.d_records.insert(rr);
1223 
1224  rr.qtype=QType::A;
1225  rr.content=ip;
1226  ad.d_records.insert(rr);
1227 
1228  if(SyncRes::s_domainmap.count(rr.qname)) {
1229    L<<Logger::Warning<<"Hosts file will not overwrite zone '"<<rr.qname<<"' already loaded"<<endl;
1230  }
1231  else {
1232    L<<Logger::Warning<<"Inserting forward zone '"<<rr.qname<<"' based on hosts file"<<endl;
1233    SyncRes::s_domainmap[rr.qname]=ad;
1234  }
1235}
1236
1237//! parts[0] must be an IP address, the rest must be host names
1238static void makeIPToNamesZone(const vector<string>& parts) 
1239{
1240  string address=parts[0];
1241  vector<string> ipparts;
1242  stringtok(ipparts, address,".");
1243 
1244  SyncRes::AuthDomain ad;
1245  DNSResourceRecord rr;
1246  for(int n=ipparts.size()-1; n>=0 ; --n) {
1247    rr.qname.append(ipparts[n]);
1248    rr.qname.append(1,'.');
1249  }
1250  rr.qname.append("in-addr.arpa.");
1251
1252  rr.d_place=DNSResourceRecord::ANSWER;
1253  rr.ttl=86400;
1254  rr.qtype=QType::SOA;
1255  rr.content="localhost. root. 1 604800 86400 2419200 604800";
1256 
1257  ad.d_records.insert(rr);
1258
1259  rr.qtype=QType::NS;
1260  rr.content="localhost.";
1261
1262  ad.d_records.insert(rr);
1263  rr.qtype=QType::PTR;
1264
1265  if(ipparts.size()==4)  // otherwise this is a partial zone
1266    for(unsigned int n=1; n < parts.size(); ++n) {
1267      rr.content=toCanonic("", parts[n]);
1268      ad.d_records.insert(rr);
1269    }
1270
1271  if(SyncRes::s_domainmap.count(rr.qname)) {
1272    L<<Logger::Warning<<"Will not overwrite zone '"<<rr.qname<<"' already loaded"<<endl;
1273  }
1274  else {
1275    if(ipparts.size()==4)
1276      L<<Logger::Warning<<"Inserting reverse zone '"<<rr.qname<<"' based on hosts file"<<endl;
1277    SyncRes::s_domainmap[rr.qname]=ad;
1278  }
1279}
1280
1281
1282void parseAuthAndForwards();
1283
1284string reloadAuthAndForwards()
1285{
1286  SyncRes::domainmap_t original=SyncRes::s_domainmap;
1287 
1288  try {
1289    L<<Logger::Warning<<"Reloading zones, purging data from cache"<<endl;
1290 
1291    for(SyncRes::domainmap_t::const_iterator i = SyncRes::s_domainmap.begin(); i != SyncRes::s_domainmap.end(); ++i) {
1292      for(SyncRes::AuthDomain::records_t::const_iterator j = i->second.d_records.begin(); j != i->second.d_records.end(); ++j) 
1293        RC.doWipeCache(j->qname);
1294    }
1295
1296    string configname=::arg()["config-dir"]+"/recursor.conf";
1297    cleanSlashes(configname);
1298   
1299    if(!::arg().preParseFile(configname.c_str(), "forward-zones")) 
1300      L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1301   
1302    ::arg().preParseFile(configname.c_str(), "auth-zones");
1303    ::arg().preParseFile(configname.c_str(), "export-etc-hosts");
1304    ::arg().preParseFile(configname.c_str(), "serve-rfc1918");
1305   
1306    parseAuthAndForwards();
1307   
1308    // purge again - new zones need to blank out the cache
1309    for(SyncRes::domainmap_t::const_iterator i = SyncRes::s_domainmap.begin(); i != SyncRes::s_domainmap.end(); ++i) {
1310      for(SyncRes::AuthDomain::records_t::const_iterator j = i->second.d_records.begin(); j != i->second.d_records.end(); ++j) 
1311        RC.doWipeCache(j->qname);
1312    }
1313
1314    // this is pretty blunt
1315    SyncRes::s_negcache.clear(); 
1316    return "ok\n";
1317  }
1318  catch(exception& e) {
1319    L<<Logger::Error<<"Had error reloading zones, keeping original data: "<<e.what()<<endl;
1320  }
1321  catch(AhuException& ae) {
1322    L<<Logger::Error<<"Encountered error reloading zones, keeping original data: "<<ae.reason<<endl;
1323  }
1324  catch(...) {
1325    L<<Logger::Error<<"Encountered unknown error reloading zones, keeping original data"<<endl;
1326  }
1327  SyncRes::s_domainmap.swap(original);
1328  return "reloading failed, see log\n";
1329}
1330
1331void parseAuthAndForwards()
1332{
1333  SyncRes::s_domainmap.clear(); // this makes us idempotent
1334
1335  TXTRecordContent::report();
1336
1337  typedef vector<string> parts_t;
1338  parts_t parts; 
1339  for(int n=0; n < 2 ; ++n ) {
1340    parts.clear();
1341    stringtok(parts, ::arg()[n ? "forward-zones" : "auth-zones"], ",\t\n\r");
1342    for(parts_t::const_iterator iter = parts.begin(); iter != parts.end(); ++iter) {
1343      SyncRes::AuthDomain ad;
1344      pair<string,string> headers=splitField(*iter, '=');
1345      trim(headers.first);
1346      trim(headers.second);
1347      headers.first=toCanonic("", headers.first);
1348      if(n==0) {
1349        L<<Logger::Error<<"Parsing authoritative data for zone '"<<headers.first<<"' from file '"<<headers.second<<"'"<<endl;
1350        ZoneParserTNG zpt(headers.second, headers.first);
1351        DNSResourceRecord rr;
1352        while(zpt.get(rr)) {
1353          try {
1354            string tmp=DNSRR2String(rr);
1355            rr=String2DNSRR(rr.qname, rr.qtype, tmp, 3600);
1356          }
1357          catch(exception &e) {
1358            throw AhuException("Error parsing record '"+rr.qname+"' of type "+rr.qtype.getName()+" in zone '"+headers.first+"' from file '"+headers.second+"': "+e.what());
1359          }
1360          catch(...) {
1361            throw AhuException("Error parsing record '"+rr.qname+"' of type "+rr.qtype.getName()+" in zone '"+headers.first+"' from file '"+headers.second+"'");
1362          }
1363
1364          ad.d_records.insert(rr);
1365
1366        }
1367      }
1368      else {
1369        L<<Logger::Error<<"Redirecting queries for zone '"<<headers.first<<"' to IP '"<<headers.second<<"'"<<endl;
1370        ad.d_server=headers.second;
1371      }
1372     
1373      SyncRes::s_domainmap[headers.first]=ad;
1374    }
1375  }
1376 
1377  if(::arg().mustDo("export-etc-hosts")) {
1378    string line;
1379    string fname;
1380   
1381    ifstream ifs("/etc/hosts");
1382    if(!ifs) {
1383      L<<Logger::Warning<<"Could not open /etc/hosts for reading"<<endl;
1384      return;
1385    }
1386   
1387    string::size_type pos;
1388    while(getline(ifs,line)) {
1389      pos=line.find('#');
1390      if(pos!=string::npos)
1391        line.resize(pos);
1392      trim(line);
1393      if(line.empty())
1394        continue;
1395      parts.clear();
1396      stringtok(parts, line, "\t\r\n ");
1397      if(parts[0].find(':')!=string::npos)
1398        continue;
1399     
1400      for(unsigned int n=1; n < parts.size(); ++n)
1401        makeNameToIPZone(parts[n], parts[0]);
1402      makeIPToNamesZone(parts);
1403    }
1404  }
1405  if(::arg().mustDo("serve-rfc1918")) {
1406    L<<Logger::Warning<<"Inserting rfc 1918 private space zones"<<endl;
1407    parts.clear();
1408    parts.push_back("127");
1409    makeIPToNamesZone(parts);
1410    parts[0]="10";
1411    makeIPToNamesZone(parts);
1412
1413    parts[0]="192.168";
1414    makeIPToNamesZone(parts);
1415    for(int n=16; n < 32; n++) {
1416      parts[0]="172."+lexical_cast<string>(n);
1417      makeIPToNamesZone(parts);
1418    }
1419  }
1420}
1421
1422int serviceMain(int argc, char*argv[])
1423{
1424  L.setName("pdns_recursor");
1425  L.setLoglevel((Logger::Urgency)(4));
1426
1427  L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2006 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1428#ifdef __GNUC__
1429  L<<", gcc "__VERSION__;
1430#endif // add other compilers here
1431#ifdef _MSC_VER
1432  L<<", MSVC "<<_MSC_VER;
1433#endif
1434  L<<") starting up"<<endl;
1435 
1436  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1437    "This is free software, and you are welcome to redistribute it "
1438    "according to the terms of the GPL version 2."<<endl;
1439 
1440  L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1441 
1442  if(!::arg()["allow-from"].empty()) {
1443    g_allowFrom=new NetmaskGroup;
1444    vector<string> ips;
1445    stringtok(ips, ::arg()["allow-from"], ", ");
1446    L<<Logger::Warning<<"Only allowing queries from: ";
1447    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1448      g_allowFrom->addMask(*i);
1449      if(i!=ips.begin())
1450        L<<Logger::Warning<<", ";
1451      L<<Logger::Warning<<*i;
1452    }
1453    L<<Logger::Warning<<endl;
1454  }
1455  else if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
1456    L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1457 
1458  g_quiet=::arg().mustDo("quiet");
1459  if(::arg().mustDo("trace")) {
1460    SyncRes::setLog(true);
1461    ::arg().set("quiet")="no";
1462    g_quiet=false;
1463  }
1464
1465  RC.d_followRFC2181=::arg().mustDo("auth-can-lower-ttl");
1466 
1467  if(!::arg()["query-local-address6"].empty()) {
1468    SyncRes::s_doIPv6=true;
1469    L<<Logger::Error<<"Enabling IPv6 transport for outgoing queries"<<endl;
1470  }
1471 
1472  SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1473  SyncRes::s_serverID=::arg()["server-id"];
1474  if(SyncRes::s_serverID.empty()) {
1475    char tmp[128];
1476    gethostname(tmp, sizeof(tmp)-1);
1477    SyncRes::s_serverID=tmp;
1478  }
1479 
1480 
1481  parseAuthAndForwards();
1482 
1483  g_stats.remotes.resize(::arg().asNum("remotes-ringbuffer-entries"));
1484  if(!g_stats.remotes.empty())
1485    memset(&g_stats.remotes[0], 0, g_stats.remotes.size() * sizeof(RecursorStats::remotes_t::value_type));
1486  g_logCommonErrors=::arg().mustDo("log-common-errors");
1487 
1488  makeUDPServerSockets();
1489  makeTCPServerSockets();
1490 
1491#ifndef WIN32
1492  if(::arg().mustDo("fork")) {
1493    fork();
1494    L<<Logger::Warning<<"This is forked pid "<<getpid()<<endl;
1495  }
1496#endif
1497 
1498  MT=new MTasker<PacketID,string>(100000);
1499  makeControlChannelSocket();       
1500  PacketID pident;
1501  primeHints();   
1502  L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1503#ifndef WIN32
1504  if(::arg().mustDo("daemon")) {
1505    L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1506    L.toConsole(Logger::Critical);
1507    daemonize();
1508  }
1509  signal(SIGUSR1,usr1Handler);
1510  signal(SIGUSR2,usr2Handler);
1511  signal(SIGPIPE,SIG_IGN);
1512  writePid();
1513#endif
1514  g_fdm=getMultiplexer();
1515 
1516  for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
1517    g_fdm->addReadFD(i->first, i->second);
1518 
1519  int newgid=0;
1520  if(!::arg()["setgid"].empty())
1521    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1522  int newuid=0;
1523  if(!::arg()["setuid"].empty())
1524    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1525 
1526#ifndef WIN32
1527  if (!::arg()["chroot"].empty()) {
1528    if (chroot(::arg()["chroot"].c_str())<0) {
1529      L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1530      exit(1);
1531    }
1532  }
1533 
1534  Utility::dropPrivs(newuid, newgid);
1535  g_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1536#endif
1537 
1538  counter=0;
1539  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1540  g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1541 
1542  g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1543 
1544 
1545  bool listenOnTCP(true);
1546 
1547  for(;;) {
1548    while(MT->schedule(g_now.tv_sec)); // housekeeping, let threads do their thing
1549     
1550    if(!(counter%500)) {
1551      MT->makeThread(houseKeeping,0);
1552    }
1553
1554    if(!(counter%55)) {
1555      typedef vector<pair<int, boost::any> > expired_t;
1556      expired_t expired=g_fdm->getTimeouts(g_now);
1557       
1558      for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1559        TCPConnection conn=any_cast<TCPConnection>(i->second);
1560        if(conn.state != TCPConnection::DONE) {
1561          if(g_logCommonErrors)
1562            L<<Logger::Warning<<"Timeout from remote TCP client "<< conn.remote.toString() <<endl;
1563          g_fdm->removeReadFD(i->first);
1564          conn.closeAndCleanup();
1565        }
1566      }
1567    }
1568     
1569    counter++;
1570
1571    if(statsWanted) {
1572      doStats();
1573    }
1574
1575    Utility::gettimeofday(&g_now, 0);
1576    g_fdm->run(&g_now);
1577
1578    if(listenOnTCP) {
1579      if(TCPConnection::s_currentConnections > maxTcpClients) {  // shutdown
1580        for(g_tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1581          g_fdm->removeReadFD(*i);
1582        listenOnTCP=false;
1583      }
1584    }
1585    else {
1586      if(TCPConnection::s_currentConnections <= maxTcpClients) {  // reenable
1587        for(g_tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1588          g_fdm->addReadFD(*i, handleNewTCPQuestion);
1589        listenOnTCP=true;
1590      }
1591    }
1592  }
1593}
1594#ifdef WIN32
1595void doWindowsServiceArguments(RecursorService& recursor)
1596{
1597  if(::arg().mustDo( "register-service" )) {
1598    if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1599      cerr << "Could not register service." << endl;
1600      exit( 99 );
1601    }
1602   
1603    exit( 0 );
1604  }
1605
1606  if ( ::arg().mustDo( "unregister-service" )) {
1607    recursor.unregisterService();
1608    exit( 0 );
1609  }
1610}
1611#endif
1612
1613int main(int argc, char **argv) 
1614{
1615  reportBasicTypes();
1616
1617  int ret = EXIT_SUCCESS;
1618#ifdef WIN32
1619  RecursorService service;
1620  WSADATA wsaData;
1621  if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
1622    cerr<<"Unable to initialize winsock\n";
1623    exit(1);
1624  }
1625#endif // WIN32
1626
1627  try {
1628    Utility::srandom(time(0));
1629    ::arg().set("soa-minimum-ttl","Don't change")="0";
1630    ::arg().set("soa-serial-offset","Don't change")="0";
1631    ::arg().set("no-shuffle","Don't change")="off";
1632    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1633    ::arg().set("local-port","port to listen on")="53";
1634    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
1635    ::arg().set("trace","if we should output heaps of logging")="off";
1636    ::arg().set("daemon","Operate as a daemon")="yes";
1637    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1638    ::arg().set("chroot","switch to chroot jail")="";
1639    ::arg().set("setgid","If set, change group id to this gid for more security")="";
1640    ::arg().set("setuid","If set, change user id to this uid for more security")="";
1641#ifdef WIN32
1642    ::arg().set("quiet","Suppress logging of questions and answers")="off";
1643    ::arg().setSwitch( "register-service", "Register the service" )= "no";
1644    ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
1645    ::arg().setSwitch( "ntservice", "Run as service" )= "no";
1646    ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes"; 
1647    ::arg().setSwitch( "use-logfile", "Use a log file" )= "no"; 
1648    ::arg().setSwitch( "logfile", "Filename of the log file" )= "recursor.log"; 
1649#else
1650    ::arg().set("quiet","Suppress logging of questions and answers")="";
1651#endif
1652    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1653    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1654    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1655    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1656    ::arg().set("query-local-address6","Source IPv6 address for sending queries")="";
1657    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1658    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1659    ::arg().set("hint-file", "If set, load root hints from this file")="";
1660    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="0";
1661    ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
1662    ::arg().set("server-id", "Returned when queried for 'server.id' TXT, defaults to hostname")="";
1663    ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
1664    ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
1665    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")="127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10";
1666    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
1667    ::arg().set("fork", "If set, fork the daemon for possible double performance")="no";
1668    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
1669    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
1670    ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
1671    ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
1672    ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
1673    ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
1674    ::arg().set("auth-can-lower-ttl", "If we follow RFC 2181 to the letter, an authoritative server can lower the TTL of NS records")="off";
1675    ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off"; 
1676
1677    ::arg().setCmd("help","Provide a helpful message");
1678    ::arg().setCmd("config","Output blank configuration");
1679    L.toConsole(Logger::Warning);
1680    ::arg().laxParse(argc,argv); // do a lax parse
1681
1682    string configname=::arg()["config-dir"]+"/recursor.conf";
1683    cleanSlashes(configname);
1684
1685    if(!::arg().file(configname.c_str())) 
1686      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
1687
1688    ::arg().parse(argc,argv);
1689
1690    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
1691
1692    if(::arg().mustDo("help")) {
1693      cerr<<"syntax:"<<endl<<endl;
1694      cerr<<::arg().helpstring(::arg()["help"])<<endl;
1695      exit(99);
1696    }
1697
1698    if(::arg().mustDo("config")) {
1699      cout<<::arg().configstring()<<endl;
1700      exit(0);
1701    }
1702
1703#ifndef WIN32
1704    serviceMain(argc, argv);
1705#else
1706    doWindowsServiceArguments(service);
1707        L.toNTLog();
1708    RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" )); 
1709#endif
1710
1711  }
1712  catch(AhuException &ae) {
1713    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1714    ret=EXIT_FAILURE;
1715  }
1716  catch(exception &e) {
1717    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1718    ret=EXIT_FAILURE;
1719  }
1720  catch(...) {
1721    L<<Logger::Error<<"any other exception in main: "<<endl;
1722    ret=EXIT_FAILURE;
1723  }
1724 
1725#ifdef WIN32
1726  WSACleanup();
1727#endif // WIN32
1728
1729  return ret;
1730}
Note: See TracBrowser for help on using the browser.