root/trunk/pdns/pdns/pdns_recursor.cc @ 232

Revision 232, 16.7 KB (checked in by ahu, 9 years ago)

do timeout stats

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18*/
19
20#include "utility.hh"
21#include <iostream>
22#include <errno.h>
23#include <map>
24#include <set>
25#ifndef WIN32
26#include <netdb.h>
27#endif // WIN32
28#include <stdio.h>
29#include <signal.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include "mtasker.hh"
33#include <utility>
34#include "dnspacket.hh"
35#include "statbag.hh"
36#include "arguments.hh"
37#include "syncres.hh"
38#include <fcntl.h>
39#include <fstream>
40#include "recursor_cache.hh"
41
42MemRecursorCache RC;
43
44string s_programname="pdns_recursor";
45
46#ifndef WIN32
47extern "C" {
48  int sem_init(sem_t*, int, unsigned int){return 0;}
49  int sem_wait(sem_t*){return 0;}
50  int sem_trywait(sem_t*){return 0;}
51  int sem_post(sem_t*){return 0;}
52  int sem_getvalue(sem_t*, int*){return 0;}
53  pthread_t pthread_self(void){return (pthread_t) 0;}
54  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
55  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
56  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
57
58}
59#endif // WIN32
60
61StatBag S;
62ArgvMap &arg()
63{
64  static ArgvMap theArg;
65  return theArg;
66}
67int d_clientsock;
68int d_serversock;
69int d_tcpserversock;
70
71struct PacketID
72{
73  u_int16_t id;
74  struct sockaddr_in remote;
75};
76
77bool operator<(const PacketID& a, const PacketID& b)
78{
79  if(a.id<b.id)
80    return true;
81
82  if(a.id==b.id) {
83    if(a.remote.sin_addr.s_addr < b.remote.sin_addr.s_addr)
84      return true;
85    if(a.remote.sin_addr.s_addr == b.remote.sin_addr.s_addr)
86      if(a.remote.sin_port < b.remote.sin_port)
87        return true;
88  }
89
90  return false;
91}
92
93MTasker<PacketID,string>* MT;
94
95/* these two functions are used by LWRes */
96int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
97{
98  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
99}
100
101int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
102{
103  PacketID pident;
104  pident.id=id;
105  memcpy(&pident.remote,toaddr,sizeof(pident.remote));
106
107  string packet;
108  if(!MT->waitEvent(pident,&packet,1)) { // timeout
109    return 0; 
110  }
111
112  *d_len=packet.size();
113  memcpy(data,packet.c_str(),min(len,*d_len));
114
115  return 1;
116}
117
118
119
120
121static void writePid(void)
122{
123  string fname=arg()["socket-dir"]+"/"+s_programname+".pid";
124  ofstream of(fname.c_str());
125  if(of)
126    of<<getpid()<<endl;
127  else
128    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
129}
130
131void primeHints(void)
132{
133  // prime root cache
134
135  static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
136                     "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
137  DNSResourceRecord arr, nsrr;
138  arr.qtype=QType::A;
139  arr.ttl=time(0)+3600000;
140  nsrr.qtype=QType::NS;
141  nsrr.ttl=time(0)+3600000;
142 
143  set<DNSResourceRecord>nsset;
144  for(char c='a';c<='m';++c) {
145    static char templ[40];
146    strncpy(templ,"a.root-servers.net", sizeof(templ) - 1);
147    *templ=c;
148    arr.qname=nsrr.content=templ;
149    arr.content=ips[c-'a'];
150    set<DNSResourceRecord>aset;
151    aset.insert(arr);
152    RC.replace(string(templ),QType(QType::A),aset);
153
154    nsset.insert(nsrr);
155  }
156  RC.replace("",QType(QType::NS),nsset);
157}
158
159void startDoResolve(void *p)
160{
161  try {
162    bool quiet=arg().mustDo("quiet");
163    DNSPacket P=*(DNSPacket *)p;
164
165    delete (DNSPacket *)p;
166
167    vector<DNSResourceRecord>ret;
168    DNSPacket *R=P.replyPacket();
169    R->setA(false);
170    R->setRA(true);
171
172    SyncRes sr;
173    if(!quiet)
174      L<<Logger::Error<<"["<<MT->getTid()<<"] question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
175
176    sr.setId(MT->getTid());
177    if(!P.d.rd)
178      sr.setCacheOnly();
179
180    int res=sr.beginResolve(P.qdomain, P.qtype, ret);
181    if(res<0)
182      R->setRcode(RCode::ServFail);
183    else {
184      R->setRcode(res);
185      for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i)
186        R->addRecord(*i);
187    }
188
189    const char *buffer=R->getData();
190    if(!R->getSocket())
191      sendto(d_serversock,buffer,R->len,0,(struct sockaddr *)(R->remote),R->d_socklen);
192    else {
193      char buf[2];
194      buf[0]=R->len/256;
195      buf[1]=R->len%256;
196      if(write(R->getSocket(),buf,2)!=2 || write(R->getSocket(),buffer,R->len)!=R->len)
197        L<<Logger::Error<<"Oops, partial answer sent to "<<P.getRemote()<<" - probably would have trouble receiving our answer anyhow (size="<<R->len<<")"<<endl;
198    }
199
200    if(!quiet) {
201      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
202      L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
203        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, rcode="<<res<<endl;
204    }
205   
206    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
207
208    delete R;
209  }
210  catch(AhuException &ae) {
211    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
212  }
213  catch(...) {
214    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
215  }
216}
217
218void makeClientSocket()
219{
220  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
221  if(d_clientsock<0) 
222    throw AhuException("Making a socket for resolver: "+stringerror());
223 
224  struct sockaddr_in sin;
225  memset((char *)&sin,0, sizeof(sin));
226 
227  sin.sin_family = AF_INET;
228  sin.sin_addr.s_addr = INADDR_ANY;
229 
230  int tries=10;
231  while(--tries) {
232    u_int16_t port=10000+Utility::random()%10000;
233    sin.sin_port = htons(port); 
234   
235    if (bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
236      break;
237   
238  }
239  if(!tries)
240    throw AhuException("Resolver binding to local socket: "+stringerror());
241}
242
243void makeTCPServerSocket()
244{
245  d_tcpserversock=socket(AF_INET, SOCK_STREAM,0);
246  if(d_tcpserversock<0) 
247    throw AhuException("Making a server socket for resolver: "+stringerror());
248 
249  struct sockaddr_in sin;
250  memset((char *)&sin,0, sizeof(sin));
251 
252  sin.sin_family = AF_INET;
253
254  if(arg()["local-address"]=="0.0.0.0") {
255    sin.sin_addr.s_addr = INADDR_ANY;
256  }
257  else {
258    if(!IpToU32(arg()["local-address"], &sin.sin_addr.s_addr))
259      throw AhuException("Unable to resolve local address"); 
260  }
261
262  sin.sin_port = htons(arg().asNum("local-port")); 
263   
264  if (bind(d_tcpserversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
265    throw AhuException("TCP Resolver binding to server socket: "+stringerror());
266 
267  int tmp=1;
268  if(setsockopt(d_tcpserversock,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
269    L<<Logger::Error<<"Setsockopt failed"<<endl;
270    exit(1); 
271  }
272
273  listen(d_tcpserversock, 128);
274}
275
276void makeServerSocket()
277{
278  d_serversock=socket(AF_INET, SOCK_DGRAM,0);
279  if(d_serversock<0) 
280    throw AhuException("Making a server socket for resolver: "+stringerror());
281 
282  struct sockaddr_in sin;
283  memset((char *)&sin,0, sizeof(sin));
284 
285  sin.sin_family = AF_INET;
286
287  if(arg()["local-address"]=="0.0.0.0") {
288    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
289    sin.sin_addr.s_addr = INADDR_ANY;
290  }
291  else {
292   
293    if(!IpToU32(arg()["local-address"], &sin.sin_addr.s_addr))
294      throw AhuException("Unable to resolve local address"); 
295
296  }
297
298  sin.sin_port = htons(arg().asNum("local-port")); 
299   
300  if (bind(d_serversock, (struct sockaddr *)&sin, sizeof(sin))<0) 
301    throw AhuException("Resolver binding to server socket: "+stringerror());
302  L<<Logger::Error<<"Incoming query source port: "<<arg().asNum("local-port")<<endl;
303}
304
305
306#ifndef WIN32
307void daemonize(void)
308{
309  if(fork())
310    exit(0); // bye bye
311 
312  setsid(); 
313
314  // cleanup open fds, but skip sockets
315  close(0);
316  close(1);
317  close(2);
318
319}
320#endif
321
322int counter, qcounter;
323bool statsWanted;
324
325void usr1Handler(int)
326{
327  statsWanted=true;
328}
329
330
331void doStats(void)
332{
333  if(qcounter) {
334   
335    L<<Logger::Error<<"stats: "<<qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
336     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits";
337    L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
338    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
339     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
340   
341  }
342  statsWanted=false;
343}
344
345void houseKeeping(void *)
346{
347  static time_t last_stat, last_rootupdate, last_prune;
348
349  if(time(0)-last_stat>60) { 
350    RC.doPrune();
351    last_prune=time(0);
352  }
353  if(time(0)-last_stat>1800) { 
354    doStats();
355    last_stat=time(0);
356  }
357  if(time(0)-last_rootupdate>7200) {
358    SyncRes sr;
359    vector<DNSResourceRecord>ret;
360
361    sr.setNoCache();
362    int res=sr.beginResolve("", QType(QType::NS), ret);
363    if(!res) {
364      L<<Logger::Error<<"Refreshed . records"<<endl;
365      last_rootupdate=time(0);
366    }
367    else
368      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
369  }
370}
371
372struct TCPConnection
373{
374  int fd;
375  enum {BYTE0, BYTE1, GETQUESTION} state;
376  int qlen;
377  int bytesread;
378  struct sockaddr_in remote;
379  char data[65535];
380};
381
382int main(int argc, char **argv) 
383{
384#ifdef WIN32
385    WSADATA wsaData;
386    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
387#endif // WIN32
388
389  try {
390    Utility::srandom(time(0));
391    arg().set("soa-minimum-ttl","Don't change")="0";
392    arg().set("soa-serial-offset","Don't change")="0";
393    arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
394    arg().set("local-port","port to listen on")="53";
395    arg().set("local-address","port to listen on")="0.0.0.0";
396    arg().set("trace","if we should output heaps of logging")="off";
397    arg().set("daemon","Operate as a daemon")="yes";
398    arg().set("quiet","Suppress logging of questions and answers")="off";
399    arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
400    arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
401    arg().set("delegation-only","Which domains we only accept delegations from")="";
402    arg().setCmd("help","Provide a helpful message");
403    L.toConsole(Logger::Warning);
404    arg().laxParse(argc,argv); // do a lax parse
405
406    string configname=arg()["config-dir"]+"/recursor.conf";
407    cleanSlashes(configname);
408
409    if(!arg().file(configname.c_str())) 
410      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
411
412    arg().parse(argc,argv);
413
414    arg().set("delegation-only")=toLower(arg()["delegation-only"]);
415
416    if(arg().mustDo("help")) {
417      cerr<<"syntax:"<<endl<<endl;
418      cerr<<arg().helpstring(arg()["help"])<<endl;
419      exit(99);
420    }
421
422    L.setName("pdns_recursor");
423
424    if(arg().mustDo("trace"))
425      SyncRes::setLog(true);
426   
427    makeClientSocket();
428    makeServerSocket();
429    makeTCPServerSocket();
430       
431    MT=new MTasker<PacketID,string>(100000);
432
433    char data[1500];
434    struct sockaddr_in fromaddr;
435   
436    PacketID pident;
437    primeHints();   
438    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
439#ifndef WIN32
440    if(arg().mustDo("daemon")) {
441      L.toConsole(Logger::Critical);
442      daemonize();
443    }
444    signal(SIGUSR1,usr1Handler);
445
446    writePid();
447#endif
448
449    vector<TCPConnection> tcpconnections;
450    counter=0;
451    for(;;) {
452      while(MT->schedule()); // housekeeping, let threads do their thing
453     
454      if(!((counter++)%100)) 
455        MT->makeThread(houseKeeping,0);
456      if(statsWanted)
457        doStats();
458
459      Utility::socklen_t addrlen=sizeof(fromaddr);
460      int d_len;
461      DNSPacket P;
462     
463      struct timeval tv;
464      tv.tv_sec=0;
465      tv.tv_usec=500000;
466     
467      fd_set readfds;
468      FD_ZERO( &readfds );
469      FD_SET( d_clientsock, &readfds );
470      FD_SET( d_serversock, &readfds );
471      FD_SET( d_tcpserversock, &readfds );
472      int fdmax=max(d_tcpserversock,max(d_clientsock,d_serversock));
473      for(vector<TCPConnection>::const_iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
474        FD_SET(i->fd, &readfds);
475        fdmax=max(fdmax,i->fd);
476      }
477
478
479      /* this should listen on a TCP port as well for new connections,  */
480      int selret = select(  fdmax + 1, &readfds, NULL, NULL, &tv );
481      if(selret<=0) 
482        if (selret == -1 && errno!=EINTR) 
483          throw AhuException("Select returned: "+stringerror());
484        else
485          continue;
486
487      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a question response?
488        d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
489        if(d_len<0) 
490          continue;
491       
492        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
493        if(P.parse(data,d_len)<0) {
494          L<<Logger::Error<<"Unparseable packet from remote server "<<P.getRemote()<<endl;
495        }
496        else { 
497          if(P.d.qr) {
498
499            pident.remote=fromaddr;
500            pident.id=P.d.id;
501            string packet;
502            packet.assign(data,d_len);
503            MT->sendEvent(pident,&packet);
504          }
505          else 
506            L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<P.getRemote()<<endl;
507        }
508      }
509     
510      if(FD_ISSET(d_serversock,&readfds)) { // do we have a new question on udp?
511        d_len=recvfrom(d_serversock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
512        if(d_len<0) 
513          continue;
514        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
515        if(P.parse(data,d_len)<0) {
516          L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
517        }
518        else { 
519          if(P.d.qr)
520            L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
521          else {
522            ++qcounter;
523            P.setSocket(0);
524            MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
525
526          }
527        }
528      }
529
530      if(FD_ISSET(d_tcpserversock,&readfds)) { // do we have a new TCP connection
531        struct sockaddr_in addr;
532        socklen_t addrlen=sizeof(addr);
533        int newsock=accept(d_tcpserversock, (struct sockaddr*)&addr, &addrlen);
534        Utility::setNonBlocking(newsock);
535       
536        if(newsock>0) {
537          TCPConnection tc;
538          tc.fd=newsock;
539          tc.state=TCPConnection::BYTE0;
540          tc.remote=addr;
541          tcpconnections.push_back(tc);
542        }
543      }
544
545      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
546        if(FD_ISSET(i->fd, &readfds)) {
547          if(i->state==TCPConnection::BYTE0) {
548            int bytes=read(i->fd,i->data,2);
549            if(bytes==1)
550              i->state=TCPConnection::BYTE1;
551            if(bytes==2) { 
552              i->qlen=(i->data[0]<<8)+i->data[1];
553              i->bytesread=0;
554              i->state=TCPConnection::GETQUESTION;
555            }
556            if(!bytes || bytes < 0) {
557              close(i->fd);
558              tcpconnections.erase(i);
559              break;
560            }
561          }
562          else if(i->state==TCPConnection::BYTE1) {
563            int bytes=read(i->fd,i->data+1,1);
564            if(bytes==1) {
565              i->state=TCPConnection::GETQUESTION;
566              i->qlen=(i->data[0]<<8)+i->data[1];
567              i->bytesread=0;
568            }
569            if(!bytes || bytes < 0) {
570              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
571              close(i->fd);
572              tcpconnections.erase(i);
573              break;
574            }
575           
576          }
577          else if(i->state==TCPConnection::GETQUESTION) {
578            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
579            if(!bytes || bytes < 0) {
580              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
581              close(i->fd);
582              tcpconnections.erase(i);
583              break;
584            }
585            i->bytesread+=bytes;
586            if(i->bytesread==i->qlen) {
587              i->state=TCPConnection::BYTE0;
588
589              if(P.parse(i->data,i->qlen)<0) {
590                L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
591                close(i->fd);
592                tcpconnections.erase(i);
593                break;
594              }
595              else { 
596                P.setSocket(i->fd);
597                P.setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
598                if(P.d.qr)
599                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
600                else {
601                  ++qcounter;
602                  MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
603                }
604              }
605            }
606          }
607        }
608      }
609    }
610  }
611  catch(AhuException &ae) {
612    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
613  }
614  catch(exception &e) {
615    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
616  }
617  catch(...) {
618    L<<Logger::Error<<"any other exception in main: "<<endl;
619  }
620 
621#ifdef WIN32
622  WSACleanup();
623#endif // WIN32
624
625  return 0;
626}
Note: See TracBrowser for help on using the browser.