root/trunk/pdns/pdns/pdns_recursor.cc @ 186

Revision 186, 18.2 KB (checked in by ahu, 10 years ago)

mostly work on delegation-only, debian patches

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18*/
19
20#include "utility.hh"
21#include <iostream>
22#include <errno.h>
23#include <map>
24#include <set>
25#ifndef WIN32
26#include <netdb.h>
27#endif // WIN32
28#include <stdio.h>
29#include <signal.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include "mtasker.hh"
33#include <utility>
34#include "dnspacket.hh"
35#include "statbag.hh"
36#include "arguments.hh"
37#include "syncres.hh"
38#include <fcntl.h>
39#include <fstream>
40
41string s_programname="pdns_recursor";
42
43#ifndef WIN32
44extern "C" {
45  int sem_init(sem_t*, int, unsigned int){return 0;}
46  int sem_wait(sem_t*){return 0;}
47  int sem_trywait(sem_t*){return 0;}
48  int sem_post(sem_t*){return 0;}
49  int sem_getvalue(sem_t*, int*){return 0;}
50  pthread_t pthread_self(void){return (pthread_t) 0;}
51  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
52  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
53  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
54
55}
56#endif // WIN32
57
58StatBag S;
59ArgvMap &arg()
60{
61  static ArgvMap theArg;
62  return theArg;
63}
64int d_clientsock;
65int d_serversock;
66int d_tcpserversock;
67
68struct PacketID
69{
70  u_int16_t id;
71  struct sockaddr_in remote;
72};
73
74bool operator<(const PacketID& a, const PacketID& b)
75{
76  if(a.id<b.id)
77    return true;
78
79  if(a.id==b.id) {
80    if(a.remote.sin_addr.s_addr < b.remote.sin_addr.s_addr)
81      return true;
82    if(a.remote.sin_addr.s_addr == b.remote.sin_addr.s_addr)
83      if(a.remote.sin_port < b.remote.sin_port)
84        return true;
85  }
86
87  return false;
88}
89
90MTasker<PacketID,string> MT(100000); // could probably be way lower
91
92/* these two functions are used by LWRes */
93int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
94{
95  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
96}
97
98int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
99{
100  PacketID pident;
101  pident.id=id;
102  memcpy(&pident.remote,toaddr,sizeof(pident.remote));
103
104  string packet;
105  if(!MT.waitEvent(pident,&packet,1)) { // timeout
106    return 0; 
107  }
108
109  *d_len=packet.size();
110  memcpy(data,packet.c_str(),min(len,*d_len));
111
112  return 1;
113}
114
115typedef map<string,set<DNSResourceRecord> > cache_t;
116static cache_t cache;
117int cacheHits, cacheMisses;
118int getCache(const string &qname, const QType& qt, set<DNSResourceRecord>* res)
119{
120  cache_t::const_iterator j=cache.find(toLower(qname)+"|"+qt.getName());
121  if(j!=cache.end() && j->first==toLower(qname)+"|"+qt.getName() && j->second.begin()->ttl>(unsigned int)time(0)) {
122    if(res)
123      *res=j->second;
124
125    return (unsigned int)j->second.begin()->ttl-time(0);
126  }
127
128  return -1;
129}
130
131void replaceCache(const string &qname, const QType& qt,  const set<DNSResourceRecord>& content)
132{
133  cache[toLower(qname)+"|"+qt.getName()]=content;
134}
135
136void doPrune(void)
137{
138  unsigned int names=0, records=0;
139
140  for(cache_t::iterator j=cache.begin();j!=cache.end();){
141    for(set<DNSResourceRecord>::iterator k=j->second.begin();k!=j->second.end();) 
142      if((unsigned int)k->ttl < (unsigned int)time(0)) {
143        j->second.erase(k++);
144        records++;
145      }
146      else
147        ++k;
148
149    if(j->second.empty()) { // everything is gone
150      //      L<<Logger::Error<<"Dropped name '"<<j->first<<"'"<<endl;
151      cache.erase(j++);
152      names++;
153
154    }
155    else {
156      //      L<<Logger::Error<<"Kept name '"<<j->first<<"'"<<endl;
157      ++j;
158    }
159  }
160  /*
161  if(names || records)
162    L<<Logger::Warning<<"Pruned "<<names<<" names, "<< records<<" records"<<endl;
163  */
164}
165
166
167static void writePid(void)
168{
169  string fname=arg()["socket-dir"]+"/"+s_programname+".pid";
170  ofstream of(fname.c_str());
171  if(of)
172    of<<getpid()<<endl;
173  else
174    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
175}
176
177void init(void)
178{
179  // prime root cache
180  static char*ips[]={"198.41.0.4", "128.9.0.107", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
181                     "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
182  DNSResourceRecord arr, nsrr;
183  arr.qtype=QType::A;
184  arr.ttl=time(0)+3600000;
185  nsrr.qtype=QType::NS;
186  nsrr.ttl=time(0)+3600000;
187 
188  set<DNSResourceRecord>nsset;
189  for(char c='a';c<='m';++c) {
190    static char templ[40];
191    strncpy(templ,"a.root-servers.net", sizeof(templ) - 1);
192    *templ=c;
193    arr.qname=nsrr.content=templ;
194    arr.content=ips[c-'a'];
195    set<DNSResourceRecord>aset;
196    aset.insert(arr);
197    replaceCache(string(templ),QType(QType::A),aset);
198
199    nsset.insert(nsrr);
200  }
201  replaceCache("",QType(QType::NS),nsset);
202}
203
204void startDoResolve(void *p)
205{
206  try {
207    bool quiet=arg().mustDo("quiet");
208    DNSPacket P=*(DNSPacket *)p;
209
210    delete (DNSPacket *)p;
211
212    vector<DNSResourceRecord>ret;
213    DNSPacket *R=P.replyPacket();
214    R->setA(false);
215    R->setRA(true);
216
217    SyncRes sr;
218    if(!quiet)
219      L<<Logger::Error<<"["<<MT.getTid()<<"] question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
220
221    sr.setId(MT.getTid());
222    if(!P.d.rd)
223      sr.setCacheOnly();
224
225    int res=sr.beginResolve(P.qdomain, P.qtype, ret);
226    if(res<0)
227      R->setRcode(RCode::ServFail);
228    else {
229      R->setRcode(res);
230      for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i)
231        R->addRecord(*i);
232    }
233
234    const char *buffer=R->getData();
235    if(!R->getSocket())
236      sendto(d_serversock,buffer,R->len,0,(struct sockaddr *)(R->remote),R->d_socklen);
237    else {
238      char buf[2];
239      buf[0]=R->len/256;
240      buf[1]=R->len%256;
241      if(write(R->getSocket(),buf,2)!=2 || write(R->getSocket(),buffer,R->len)!=R->len)
242        L<<Logger::Error<<"Oops, partial answer sent to "<<P.getRemote()<<" - probably would have trouble receiving our answer anyhow (size="<<R->len<<")"<<endl;
243    }
244
245    if(!quiet) {
246      L<<Logger::Error<<"["<<MT.getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
247      L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
248        sr.d_throttledqueries<<" throttled, rcode="<<res<<endl;
249    }
250   
251    sr.d_outqueries ? cacheMisses++ : cacheHits++;
252
253    delete R;
254  }
255  catch(AhuException &ae) {
256    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
257  }
258  catch(...) {
259    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
260  }
261}
262
263void makeClientSocket()
264{
265  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
266  if(d_clientsock<0) 
267    throw AhuException("Making a socket for resolver: "+stringerror());
268 
269  struct sockaddr_in sin;
270  memset((char *)&sin,0, sizeof(sin));
271 
272  sin.sin_family = AF_INET;
273  sin.sin_addr.s_addr = INADDR_ANY;
274 
275  int tries=10;
276  while(--tries) {
277    u_int16_t port=10000+Utility::random()%10000;
278    sin.sin_port = htons(port); 
279   
280    if (bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
281      break;
282   
283  }
284  if(!tries)
285    throw AhuException("Resolver binding to local socket: "+stringerror());
286}
287
288void makeTCPServerSocket()
289{
290  d_tcpserversock=socket(AF_INET, SOCK_STREAM,0);
291  if(d_tcpserversock<0) 
292    throw AhuException("Making a server socket for resolver: "+stringerror());
293 
294  struct sockaddr_in sin;
295  memset((char *)&sin,0, sizeof(sin));
296 
297  sin.sin_family = AF_INET;
298
299  if(arg()["local-address"]=="0.0.0.0") {
300    sin.sin_addr.s_addr = INADDR_ANY;
301  }
302  else {
303    struct hostent *h=0;
304    h=gethostbyname(arg()["local-address"].c_str());
305    if(!h)
306      throw AhuException("Unable to resolve local address"); 
307   
308    sin.sin_addr.s_addr=*(int*)h->h_addr;
309  }
310
311  sin.sin_port = htons(arg().asNum("local-port")); 
312   
313  if (bind(d_tcpserversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
314    throw AhuException("TCP Resolver binding to server socket: "+stringerror());
315 
316  int tmp=1;
317  if(setsockopt(d_tcpserversock,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
318    L<<Logger::Error<<"Setsockopt failed"<<endl;
319    exit(1); 
320  }
321
322  listen(d_tcpserversock, 128);
323}
324
325void makeServerSocket()
326{
327  d_serversock=socket(AF_INET, SOCK_DGRAM,0);
328  if(d_serversock<0) 
329    throw AhuException("Making a server socket for resolver: "+stringerror());
330 
331  struct sockaddr_in sin;
332  memset((char *)&sin,0, sizeof(sin));
333 
334  sin.sin_family = AF_INET;
335
336  if(arg()["local-address"]=="0.0.0.0") {
337    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
338    sin.sin_addr.s_addr = INADDR_ANY;
339  }
340  else {
341    struct hostent *h=0;
342    h=gethostbyname(arg()["local-address"].c_str());
343    if(!h)
344      throw AhuException("Unable to resolve local address"); 
345   
346    sin.sin_addr.s_addr=*(int*)h->h_addr;
347  }
348
349  sin.sin_port = htons(arg().asNum("local-port")); 
350   
351  if (bind(d_serversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
352    throw AhuException("Resolver binding to server socket: "+stringerror());
353  L<<Logger::Error<<"Incoming query source port: "<<arg().asNum("local-port")<<endl;
354}
355
356
357#ifndef WIN32
358void daemonize(void)
359{
360  if(fork())
361    exit(0); // bye bye
362 
363  setsid(); 
364
365  // cleanup open fds, but skip sockets
366  close(0);
367  close(1);
368  close(2);
369
370}
371#endif
372
373int counter, qcounter;
374bool statsWanted;
375
376void usr1Handler(int)
377{
378  statsWanted=true;
379}
380
381
382void doStats(void)
383{
384  if(qcounter) {
385    L<<Logger::Error<<"stats: "<<qcounter<<" questions, "<<cache.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
386     <<(int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits";
387    L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
388    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled"<<endl;
389  }
390  statsWanted=false;
391}
392
393void houseKeeping(void *)
394{
395  static time_t last_stat, last_rootupdate, last_prune;
396
397  if(time(0)-last_stat>30) { 
398    doPrune();
399    last_prune=time(0);
400  }
401  if(time(0)-last_stat>1800) { 
402    doStats();
403    last_stat=time(0);
404  }
405  if(time(0)-last_rootupdate>7200) {
406    SyncRes sr;
407    vector<DNSResourceRecord>ret;
408
409    sr.setNoCache();
410    int res=sr.beginResolve("", QType(QType::NS), ret);
411    if(!res) {
412      L<<Logger::Error<<"Refreshed . records"<<endl;
413      last_rootupdate=time(0);
414    }
415    else
416      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
417  }
418}
419
420struct TCPConnection
421{
422  int fd;
423  enum {BYTE0, BYTE1, GETQUESTION} state;
424  int qlen;
425  int bytesread;
426  struct sockaddr_in remote;
427  char data[65535];
428};
429
430int main(int argc, char **argv) 
431{
432#ifdef WIN32
433    WSADATA wsaData;
434    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
435#endif // WIN32
436
437  try {
438    Utility::srandom(time(0));
439    arg().set("soa-minimum-ttl","Don't change")="0";
440    arg().set("soa-serial-offset","Don't change")="0";
441    arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
442    arg().set("local-port","port to listen on")="53";
443    arg().set("local-address","port to listen on")="0.0.0.0";
444    arg().set("trace","if we should output heaps of logging")="off";
445    arg().set("daemon","Operate as a daemon")="yes";
446    arg().set("quiet","Suppress logging of questions and answers")="off";
447    arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
448    arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
449    arg().set("delegation-only","Which domains we only accept delegations from")="";
450    arg().setCmd("help","Provide a helpful message");
451    L.toConsole(Logger::Warning);
452    arg().laxParse(argc,argv); // do a lax parse
453
454    string configname=arg()["config-dir"]+"/recursor.conf";
455    cleanSlashes(configname);
456
457    if(!arg().file(configname.c_str())) 
458      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
459
460    arg().parse(argc,argv);
461
462    arg().set("delegation-only")=toLower(arg()["delegation-only"]);
463
464    if(arg().mustDo("help")) {
465      cerr<<"syntax:"<<endl<<endl;
466      cerr<<arg().helpstring(arg()["help"])<<endl;
467      exit(99);
468    }
469
470    L.setName("pdns_recursor");
471
472    if(arg().mustDo("trace"))
473      SyncRes::setLog(true);
474   
475    makeClientSocket();
476    makeServerSocket();
477    makeTCPServerSocket();
478       
479
480
481    char data[1500];
482    struct sockaddr_in fromaddr;
483   
484    PacketID pident;
485    init();   
486    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
487#ifndef WIN32
488    if(arg().mustDo("daemon")) {
489      L.toConsole(Logger::Critical);
490      daemonize();
491    }
492    signal(SIGUSR1,usr1Handler);
493
494    writePid();
495#endif
496
497    vector<TCPConnection> tcpconnections;
498    for(;;) {
499      while(MT.schedule()); // housekeeping, let threads do their thing
500     
501      if(!((counter++)%100)) 
502        MT.makeThread(houseKeeping,0);
503      if(statsWanted)
504        doStats();
505
506      Utility::socklen_t addrlen=sizeof(fromaddr);
507      int d_len;
508      DNSPacket P;
509     
510      struct timeval tv;
511      tv.tv_sec=0;
512      tv.tv_usec=500000;
513     
514      fd_set readfds;
515      FD_ZERO( &readfds );
516      FD_SET( d_clientsock, &readfds );
517      FD_SET( d_serversock, &readfds );
518      FD_SET( d_tcpserversock, &readfds );
519      int fdmax=max(d_tcpserversock,max(d_clientsock,d_serversock));
520      for(vector<TCPConnection>::const_iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
521        FD_SET(i->fd, &readfds);
522        fdmax=max(fdmax,i->fd);
523      }
524
525
526      /* this should listen on a TCP port as well for new connections,  */
527      int selret = select(  fdmax + 1, &readfds, NULL, NULL, &tv );
528      if(selret<=0) 
529        if (selret == -1 && errno!=EINTR) 
530          throw AhuException("Select returned: "+stringerror());
531        else
532          continue;
533
534      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a question response?
535        d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
536        if(d_len<0) 
537          continue;
538       
539        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
540        if(P.parse(data,d_len)<0) {
541          L<<Logger::Error<<"Unparseable packet from remote server "<<P.getRemote()<<endl;
542        }
543        else { 
544          if(P.d.qr) {
545
546            pident.remote=fromaddr;
547            pident.id=P.d.id;
548            string packet;
549            packet.assign(data,d_len);
550            MT.sendEvent(pident,&packet);
551          }
552          else 
553            L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<P.getRemote()<<endl;
554        }
555      }
556     
557      if(FD_ISSET(d_serversock,&readfds)) { // do we have a new question on udp?
558        d_len=recvfrom(d_serversock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
559        if(d_len<0) 
560          continue;
561        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
562        if(P.parse(data,d_len)<0) {
563          L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
564        }
565        else { 
566          if(P.d.qr)
567            L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
568          else {
569            ++qcounter;
570            P.setSocket(0);
571            MT.makeThread(startDoResolve,(void*)new DNSPacket(P));
572
573          }
574        }
575      }
576
577      if(FD_ISSET(d_tcpserversock,&readfds)) { // do we have a new TCP connection
578        struct sockaddr_in addr;
579        socklen_t addrlen=sizeof(addr);
580        int newsock=accept(d_tcpserversock, (struct sockaddr*)&addr, &addrlen);
581        Utility::setNonBlocking(newsock);
582       
583        if(newsock>0) {
584          TCPConnection tc;
585          tc.fd=newsock;
586          tc.state=TCPConnection::BYTE0;
587          tc.remote=addr;
588          L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&tc.remote,sizeof(tc.remote))<<" connected"<<endl;
589          tcpconnections.push_back(tc);
590        }
591      }
592
593      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
594        if(FD_ISSET(i->fd, &readfds)) {
595          if(i->state==TCPConnection::BYTE0) {
596            int bytes=read(i->fd,i->data,2);
597            if(bytes==1)
598              i->state=TCPConnection::BYTE1;
599            if(bytes==2) { 
600              i->qlen=(i->data[0]<<8)+i->data[1];
601              i->bytesread=0;
602              i->state=TCPConnection::GETQUESTION;
603            }
604            if(!bytes || bytes < 0) {
605              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected"<<endl;
606              close(i->fd);
607              tcpconnections.erase(i);
608              break;
609            }
610          }
611          else if(i->state==TCPConnection::BYTE1) {
612            int bytes=read(i->fd,i->data+1,1);
613            if(bytes==1) {
614              i->state=TCPConnection::GETQUESTION;
615              i->qlen=(i->data[0]<<8)+i->data[1];
616              i->bytesread=0;
617            }
618            if(!bytes || bytes < 0) {
619              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
620              close(i->fd);
621              tcpconnections.erase(i);
622              break;
623            }
624           
625          }
626          else if(i->state==TCPConnection::GETQUESTION) {
627            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
628            if(!bytes || bytes < 0) {
629              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
630              close(i->fd);
631              tcpconnections.erase(i);
632              break;
633            }
634            i->bytesread+=bytes;
635            if(i->bytesread==i->qlen) {
636              i->state=TCPConnection::BYTE0;
637
638              if(P.parse(i->data,i->qlen)<0) {
639                L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
640                close(i->fd);
641                tcpconnections.erase(i);
642                break;
643              }
644              else { 
645                P.setSocket(i->fd);
646                P.setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
647                if(P.d.qr)
648                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
649                else {
650                  ++qcounter;
651                  MT.makeThread(startDoResolve,(void*)new DNSPacket(P));
652                }
653              }
654            }
655          }
656        }
657      }
658    }
659  }
660  catch(AhuException &ae) {
661    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
662  }
663  catch(exception &e) {
664    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
665  }
666  catch(...) {
667    L<<Logger::Error<<"any other exception in main: "<<endl;
668  }
669 
670#ifdef WIN32
671  WSACleanup();
672#endif // WIN32
673
674  return 0;
675}
Note: See TracBrowser for help on using the browser.