root/trunk/pdns/pdns/pdns_recursor.cc @ 234

Revision 234, 16.8 KB (checked in by ahu, 9 years ago)

dynlistener now cleans up after itself
does slightly better error messages
added --version-string
improved pdns_recursor stats logging

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18*/
19
20#include "utility.hh"
21#include <iostream>
22#include <errno.h>
23#include <map>
24#include <set>
25#ifndef WIN32
26#include <netdb.h>
27#endif // WIN32
28#include <stdio.h>
29#include <signal.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include "mtasker.hh"
33#include <utility>
34#include "dnspacket.hh"
35#include "statbag.hh"
36#include "arguments.hh"
37#include "syncres.hh"
38#include <fcntl.h>
39#include <fstream>
40#include "recursor_cache.hh"
41
42MemRecursorCache RC;
43
44string s_programname="pdns_recursor";
45
46#ifndef WIN32
47extern "C" {
48  int sem_init(sem_t*, int, unsigned int){return 0;}
49  int sem_wait(sem_t*){return 0;}
50  int sem_trywait(sem_t*){return 0;}
51  int sem_post(sem_t*){return 0;}
52  int sem_getvalue(sem_t*, int*){return 0;}
53  pthread_t pthread_self(void){return (pthread_t) 0;}
54  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
55  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
56  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
57
58}
59#endif // WIN32
60
61StatBag S;
62ArgvMap &arg()
63{
64  static ArgvMap theArg;
65  return theArg;
66}
67int d_clientsock;
68int d_serversock;
69int d_tcpserversock;
70
71struct PacketID
72{
73  u_int16_t id;
74  struct sockaddr_in remote;
75};
76
77bool operator<(const PacketID& a, const PacketID& b)
78{
79  if(a.id<b.id)
80    return true;
81
82  if(a.id==b.id) {
83    if(a.remote.sin_addr.s_addr < b.remote.sin_addr.s_addr)
84      return true;
85    if(a.remote.sin_addr.s_addr == b.remote.sin_addr.s_addr)
86      if(a.remote.sin_port < b.remote.sin_port)
87        return true;
88  }
89
90  return false;
91}
92
93MTasker<PacketID,string>* MT;
94
95/* these two functions are used by LWRes */
96int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
97{
98  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
99}
100
101int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
102{
103  PacketID pident;
104  pident.id=id;
105  memcpy(&pident.remote,toaddr,sizeof(pident.remote));
106
107  string packet;
108  if(!MT->waitEvent(pident,&packet,1)) { // timeout
109    return 0; 
110  }
111
112  *d_len=packet.size();
113  memcpy(data,packet.c_str(),min(len,*d_len));
114
115  return 1;
116}
117
118
119
120
121static void writePid(void)
122{
123  string fname=arg()["socket-dir"]+"/"+s_programname+".pid";
124  ofstream of(fname.c_str());
125  if(of)
126    of<<getpid()<<endl;
127  else
128    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
129}
130
131void primeHints(void)
132{
133  // prime root cache
134
135  static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
136                     "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
137  DNSResourceRecord arr, nsrr;
138  arr.qtype=QType::A;
139  arr.ttl=time(0)+3600000;
140  nsrr.qtype=QType::NS;
141  nsrr.ttl=time(0)+3600000;
142 
143  set<DNSResourceRecord>nsset;
144  for(char c='a';c<='m';++c) {
145    static char templ[40];
146    strncpy(templ,"a.root-servers.net", sizeof(templ) - 1);
147    *templ=c;
148    arr.qname=nsrr.content=templ;
149    arr.content=ips[c-'a'];
150    set<DNSResourceRecord>aset;
151    aset.insert(arr);
152    RC.replace(string(templ),QType(QType::A),aset);
153
154    nsset.insert(nsrr);
155  }
156  RC.replace("",QType(QType::NS),nsset);
157}
158
159void startDoResolve(void *p)
160{
161  try {
162    bool quiet=arg().mustDo("quiet");
163    DNSPacket P=*(DNSPacket *)p;
164
165    delete (DNSPacket *)p;
166
167    vector<DNSResourceRecord>ret;
168    DNSPacket *R=P.replyPacket();
169    R->setA(false);
170    R->setRA(true);
171
172    SyncRes sr;
173    if(!quiet)
174      L<<Logger::Error<<"["<<MT->getTid()<<"] question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
175
176    sr.setId(MT->getTid());
177    if(!P.d.rd)
178      sr.setCacheOnly();
179
180    int res=sr.beginResolve(P.qdomain, P.qtype, ret);
181    if(res<0)
182      R->setRcode(RCode::ServFail);
183    else {
184      R->setRcode(res);
185      for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i)
186        R->addRecord(*i);
187    }
188
189    const char *buffer=R->getData();
190    if(!R->getSocket())
191      sendto(d_serversock,buffer,R->len,0,(struct sockaddr *)(R->remote),R->d_socklen);
192    else {
193      char buf[2];
194      buf[0]=R->len/256;
195      buf[1]=R->len%256;
196      if(write(R->getSocket(),buf,2)!=2 || write(R->getSocket(),buffer,R->len)!=R->len)
197        L<<Logger::Error<<"Oops, partial answer sent to "<<P.getRemote()<<" - probably would have trouble receiving our answer anyhow (size="<<R->len<<")"<<endl;
198    }
199
200    if(!quiet) {
201      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
202      L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
203        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, rcode="<<res<<endl;
204    }
205   
206    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
207
208    delete R;
209  }
210  catch(AhuException &ae) {
211    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
212  }
213  catch(...) {
214    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
215  }
216}
217
218void makeClientSocket()
219{
220  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
221  if(d_clientsock<0) 
222    throw AhuException("Making a socket for resolver: "+stringerror());
223 
224  struct sockaddr_in sin;
225  memset((char *)&sin,0, sizeof(sin));
226 
227  sin.sin_family = AF_INET;
228  sin.sin_addr.s_addr = INADDR_ANY;
229 
230  int tries=10;
231  while(--tries) {
232    u_int16_t port=10000+Utility::random()%10000;
233    sin.sin_port = htons(port); 
234   
235    if (bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
236      break;
237   
238  }
239  if(!tries)
240    throw AhuException("Resolver binding to local socket: "+stringerror());
241}
242
243void makeTCPServerSocket()
244{
245  d_tcpserversock=socket(AF_INET, SOCK_STREAM,0);
246  if(d_tcpserversock<0) 
247    throw AhuException("Making a server socket for resolver: "+stringerror());
248 
249  struct sockaddr_in sin;
250  memset((char *)&sin,0, sizeof(sin));
251 
252  sin.sin_family = AF_INET;
253
254  if(arg()["local-address"]=="0.0.0.0") {
255    sin.sin_addr.s_addr = INADDR_ANY;
256  }
257  else {
258    if(!IpToU32(arg()["local-address"], &sin.sin_addr.s_addr))
259      throw AhuException("Unable to resolve local address"); 
260  }
261
262  sin.sin_port = htons(arg().asNum("local-port")); 
263   
264  if (bind(d_tcpserversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
265    throw AhuException("TCP Resolver binding to server socket: "+stringerror());
266 
267  int tmp=1;
268  if(setsockopt(d_tcpserversock,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
269    L<<Logger::Error<<"Setsockopt failed"<<endl;
270    exit(1); 
271  }
272
273  listen(d_tcpserversock, 128);
274}
275
276void makeServerSocket()
277{
278  d_serversock=socket(AF_INET, SOCK_DGRAM,0);
279  if(d_serversock<0) 
280    throw AhuException("Making a server socket for resolver: "+stringerror());
281 
282  struct sockaddr_in sin;
283  memset((char *)&sin,0, sizeof(sin));
284 
285  sin.sin_family = AF_INET;
286
287  if(arg()["local-address"]=="0.0.0.0") {
288    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
289    sin.sin_addr.s_addr = INADDR_ANY;
290  }
291  else {
292   
293    if(!IpToU32(arg()["local-address"], &sin.sin_addr.s_addr))
294      throw AhuException("Unable to resolve local address"); 
295
296  }
297
298  sin.sin_port = htons(arg().asNum("local-port")); 
299   
300  if (bind(d_serversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
301    throw AhuException("Resolver binding to server socket: "+stringerror());
302  L<<Logger::Error<<"Incoming query source port: "<<arg().asNum("local-port")<<endl;
303}
304
305
306#ifndef WIN32
307void daemonize(void)
308{
309  if(fork())
310    exit(0); // bye bye
311 
312  setsid(); 
313
314  // cleanup open fds, but skip sockets
315  close(0);
316  close(1);
317  close(2);
318
319}
320#endif
321
322int counter, qcounter;
323bool statsWanted;
324
325void usr1Handler(int)
326{
327  statsWanted=true;
328}
329
330
331void doStats(void)
332{
333  if(qcounter) {
334   
335    L<<Logger::Error<<"stats: "<<qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
336     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits";
337    L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
338    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
339     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
340    L<<Logger::Error<<"queries running: "<<MT->numProcesses()<<endl;
341   
342  }
343  statsWanted=false;
344}
345
346void houseKeeping(void *)
347{
348  static time_t last_stat, last_rootupdate, last_prune;
349
350  if(time(0)-last_stat>60) { 
351    RC.doPrune();
352    last_prune=time(0);
353  }
354  if(time(0)-last_stat>1800) { 
355    doStats();
356    last_stat=time(0);
357  }
358  if(time(0)-last_rootupdate>7200) {
359    SyncRes sr;
360    vector<DNSResourceRecord>ret;
361
362    sr.setNoCache();
363    int res=sr.beginResolve("", QType(QType::NS), ret);
364    if(!res) {
365      L<<Logger::Error<<"Refreshed . records"<<endl;
366      last_rootupdate=time(0);
367    }
368    else
369      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
370  }
371}
372
373struct TCPConnection
374{
375  int fd;
376  enum {BYTE0, BYTE1, GETQUESTION} state;
377  int qlen;
378  int bytesread;
379  struct sockaddr_in remote;
380  char data[65535];
381};
382
383int main(int argc, char **argv) 
384{
385#ifdef WIN32
386    WSADATA wsaData;
387    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
388#endif // WIN32
389
390  try {
391    Utility::srandom(time(0));
392    arg().set("soa-minimum-ttl","Don't change")="0";
393    arg().set("soa-serial-offset","Don't change")="0";
394    arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
395    arg().set("local-port","port to listen on")="53";
396    arg().set("local-address","port to listen on")="0.0.0.0";
397    arg().set("trace","if we should output heaps of logging")="off";
398    arg().set("daemon","Operate as a daemon")="yes";
399    arg().set("quiet","Suppress logging of questions and answers")="off";
400    arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
401    arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
402    arg().set("delegation-only","Which domains we only accept delegations from")="";
403    arg().setCmd("help","Provide a helpful message");
404    L.toConsole(Logger::Warning);
405    arg().laxParse(argc,argv); // do a lax parse
406
407    string configname=arg()["config-dir"]+"/recursor.conf";
408    cleanSlashes(configname);
409
410    if(!arg().file(configname.c_str())) 
411      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
412
413    arg().parse(argc,argv);
414
415    arg().set("delegation-only")=toLower(arg()["delegation-only"]);
416
417    if(arg().mustDo("help")) {
418      cerr<<"syntax:"<<endl<<endl;
419      cerr<<arg().helpstring(arg()["help"])<<endl;
420      exit(99);
421    }
422
423    L.setName("pdns_recursor");
424
425    if(arg().mustDo("trace"))
426      SyncRes::setLog(true);
427   
428    makeClientSocket();
429    makeServerSocket();
430    makeTCPServerSocket();
431       
432    MT=new MTasker<PacketID,string>(100000);
433
434    char data[1500];
435    struct sockaddr_in fromaddr;
436   
437    PacketID pident;
438    primeHints();   
439    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
440#ifndef WIN32
441    if(arg().mustDo("daemon")) {
442      L.toConsole(Logger::Critical);
443      daemonize();
444    }
445    signal(SIGUSR1,usr1Handler);
446
447    writePid();
448#endif
449
450    vector<TCPConnection> tcpconnections;
451    counter=0;
452    for(;;) {
453      while(MT->schedule()); // housekeeping, let threads do their thing
454     
455      if(!((counter++)%100)) 
456        MT->makeThread(houseKeeping,0);
457      if(statsWanted)
458        doStats();
459
460      Utility::socklen_t addrlen=sizeof(fromaddr);
461      int d_len;
462      DNSPacket P;
463     
464      struct timeval tv;
465      tv.tv_sec=0;
466      tv.tv_usec=500000;
467     
468      fd_set readfds;
469      FD_ZERO( &readfds );
470      FD_SET( d_clientsock, &readfds );
471      FD_SET( d_serversock, &readfds );
472      FD_SET( d_tcpserversock, &readfds );
473      int fdmax=max(d_tcpserversock,max(d_clientsock,d_serversock));
474      for(vector<TCPConnection>::const_iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
475        FD_SET(i->fd, &readfds);
476        fdmax=max(fdmax,i->fd);
477      }
478
479
480      /* this should listen on a TCP port as well for new connections,  */
481      int selret = select(  fdmax + 1, &readfds, NULL, NULL, &tv );
482      if(selret<=0) 
483        if (selret == -1 && errno!=EINTR) 
484          throw AhuException("Select returned: "+stringerror());
485        else
486          continue;
487
488      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a question response?
489        d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
490        if(d_len<0) 
491          continue;
492       
493        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
494        if(P.parse(data,d_len)<0) {
495          L<<Logger::Error<<"Unparseable packet from remote server "<<P.getRemote()<<endl;
496        }
497        else { 
498          if(P.d.qr) {
499
500            pident.remote=fromaddr;
501            pident.id=P.d.id;
502            string packet;
503            packet.assign(data,d_len);
504            MT->sendEvent(pident,&packet);
505          }
506          else 
507            L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<P.getRemote()<<endl;
508        }
509      }
510     
511      if(FD_ISSET(d_serversock,&readfds)) { // do we have a new question on udp?
512        d_len=recvfrom(d_serversock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
513        if(d_len<0) 
514          continue;
515        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
516        if(P.parse(data,d_len)<0) {
517          L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
518        }
519        else { 
520          if(P.d.qr)
521            L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
522          else {
523            ++qcounter;
524            P.setSocket(0);
525            MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
526
527          }
528        }
529      }
530
531      if(FD_ISSET(d_tcpserversock,&readfds)) { // do we have a new TCP connection
532        struct sockaddr_in addr;
533        socklen_t addrlen=sizeof(addr);
534        int newsock=accept(d_tcpserversock, (struct sockaddr*)&addr, &addrlen);
535        Utility::setNonBlocking(newsock);
536       
537        if(newsock>0) {
538          TCPConnection tc;
539          tc.fd=newsock;
540          tc.state=TCPConnection::BYTE0;
541          tc.remote=addr;
542          tcpconnections.push_back(tc);
543        }
544      }
545
546      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
547        if(FD_ISSET(i->fd, &readfds)) {
548          if(i->state==TCPConnection::BYTE0) {
549            int bytes=read(i->fd,i->data,2);
550            if(bytes==1)
551              i->state=TCPConnection::BYTE1;
552            if(bytes==2) { 
553              i->qlen=(i->data[0]<<8)+i->data[1];
554              i->bytesread=0;
555              i->state=TCPConnection::GETQUESTION;
556            }
557            if(!bytes || bytes < 0) {
558              close(i->fd);
559              tcpconnections.erase(i);
560              break;
561            }
562          }
563          else if(i->state==TCPConnection::BYTE1) {
564            int bytes=read(i->fd,i->data+1,1);
565            if(bytes==1) {
566              i->state=TCPConnection::GETQUESTION;
567              i->qlen=(i->data[0]<<8)+i->data[1];
568              i->bytesread=0;
569            }
570            if(!bytes || bytes < 0) {
571              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
572              close(i->fd);
573              tcpconnections.erase(i);
574              break;
575            }
576           
577          }
578          else if(i->state==TCPConnection::GETQUESTION) {
579            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
580            if(!bytes || bytes < 0) {
581              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
582              close(i->fd);
583              tcpconnections.erase(i);
584              break;
585            }
586            i->bytesread+=bytes;
587            if(i->bytesread==i->qlen) {
588              i->state=TCPConnection::BYTE0;
589
590              if(P.parse(i->data,i->qlen)<0) {
591                L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
592                close(i->fd);
593                tcpconnections.erase(i);
594                break;
595              }
596              else { 
597                P.setSocket(i->fd);
598                P.setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
599                if(P.d.qr)
600                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
601                else {
602                  ++qcounter;
603                  MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
604                }
605              }
606            }
607          }
608        }
609      }
610    }
611  }
612  catch(AhuException &ae) {
613    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
614  }
615  catch(exception &e) {
616    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
617  }
618  catch(...) {
619    L<<Logger::Error<<"any other exception in main: "<<endl;
620  }
621 
622#ifdef WIN32
623  WSACleanup();
624#endif // WIN32
625
626  return 0;
627}
Note: See TracBrowser for help on using the browser.