root/trunk/pdns/pdns/pdns_recursor.cc @ 673

Revision 673, 33.8 KB (checked in by ahu, 7 years ago)

Try to prevent the 'Oops, sent partial answer' error, and made it more informative in case it does happen

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2006  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17*/
18
19#include "utility.hh"
20#include <iostream>
21#include <errno.h>
22#include <map>
23#include <set>
24#ifndef WIN32
25#include <netdb.h>
26#endif // WIN32
27#include "recursor_cache.hh"
28#include <stdio.h>
29#include <signal.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <netinet/tcp.h>
33#include "mtasker.hh"
34#include <utility>
35#include "arguments.hh"
36#include "syncres.hh"
37#include <fcntl.h>
38#include <fstream>
39#include "sstuff.hh"
40#include <boost/tuple/tuple.hpp>
41#include <boost/tuple/tuple_comparison.hpp>
42#include <boost/shared_array.hpp>
43#include <boost/lexical_cast.hpp>
44#include "dnsparser.hh"
45#include "dnswriter.hh"
46#include "dnsrecords.hh"
47#include "zoneparser-tng.hh"
48#include "rec_channel.hh"
49#include "logger.hh"
50#include "iputils.hh"
51
52#ifndef RECURSOR
53#include "statbag.hh"
54StatBag S;
55#endif
56
57
58using namespace boost;
59
60#ifdef __FreeBSD__           // see cvstrac ticket #26
61#include <pthread.h>
62#include <semaphore.h>
63#endif
64
65MemRecursorCache RC;
66RecursorStats g_stats;
67bool g_quiet;
68NetmaskGroup* g_allowFrom;
69string s_programname="pdns_recursor";
70
71struct DNSComboWriter {
72  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), d_tcp(false), d_socket(-1)
73  {}
74  MOADNSParser d_mdp;
75  void setRemote(struct sockaddr* sa, socklen_t len)
76  {
77    memcpy((void *)d_remote, (void *)sa, len);
78    d_socklen=len;
79  }
80
81  void setSocket(int sock)
82  {
83    d_socket=sock;
84  }
85
86  string getRemote() const
87  {
88    return sockAddrToString((struct sockaddr_in *)d_remote, d_socklen);
89  }
90
91  struct timeval d_now;
92  char d_remote[sizeof(sockaddr_in6)];
93  socklen_t d_socklen;
94  bool d_tcp;
95  int d_socket;
96};
97
98
99#ifndef WIN32
100#ifndef __FreeBSD__
101extern "C" {
102  int sem_init(sem_t*, int, unsigned int){return 0;}
103  int sem_wait(sem_t*){return 0;}
104  int sem_trywait(sem_t*){return 0;}
105  int sem_post(sem_t*){return 0;}
106  int sem_getvalue(sem_t*, int*){return 0;}
107  pthread_t pthread_self(void){return (pthread_t) 0;}
108  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
109  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
110  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
111  int pthread_mutex_destroy(pthread_mutex_t *mutex) { return 0; }
112}
113#endif // __FreeBSD__
114#endif // WIN32
115
116ArgvMap &arg()
117{
118  static ArgvMap theArg;
119  return theArg;
120}
121static int d_clientsock;
122static vector<int> d_udpserversocks;
123
124typedef vector<int> tcpserversocks_t;
125static tcpserversocks_t s_tcpserversocks;
126
127static map<int,PacketID> d_tcpclientreadsocks, d_tcpclientwritesocks;
128
129MTasker<PacketID,string>* MT;
130
131int asendtcp(const string& data, Socket* sock) 
132{
133  PacketID pident;
134  pident.sock=sock;
135  pident.outMSG=data;
136  string packet;
137
138  d_tcpclientwritesocks[sock->getHandle()]=pident;
139
140  int ret=MT->waitEvent(pident,&packet,1);
141  if(!ret || ret==-1) { // timeout
142    d_tcpclientwritesocks.erase(sock->getHandle());
143  }
144  return ret;
145}
146
147// -1 is error, 0 is timeout, 1 is success
148int arecvtcp(string& data, int len, Socket* sock) 
149{
150  data="";
151  PacketID pident;
152  pident.sock=sock;
153  pident.inNeeded=len;
154
155  d_tcpclientreadsocks[sock->getHandle()]=pident;
156
157  int ret=MT->waitEvent(pident,&data,1);
158  if(!ret || ret==-1) { // timeout
159    d_tcpclientreadsocks.erase(sock->getHandle());
160  }
161  return ret;
162}
163
164
165/* these two functions are used by LWRes */
166// -1 is error, > 1 is success
167int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
168{
169  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
170}
171
172// -1 is error, 0 is timeout, 1 is success
173int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
174{
175  PacketID pident;
176  pident.id=id;
177  memcpy(&pident.remote, toaddr, sizeof(pident.remote));
178
179  string packet;
180  int ret=MT->waitEvent(pident, &packet, 1);
181  if(ret > 0) {
182    *d_len=packet.size();
183    memcpy(data,packet.c_str(),min(len,*d_len));
184  }
185  return ret;
186}
187
188void setBuffer(int fd, int optname, uint32_t size)
189{
190  uint32_t psize=0;
191  socklen_t len=sizeof(psize);
192 
193  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
194    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
195    return; 
196  }
197
198  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
199    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
200}
201
202
203static void setReceiveBuffer(int fd, uint32_t size)
204{
205  setBuffer(fd, SO_RCVBUF, size);
206}
207
208static void setSendBuffer(int fd, uint32_t size)
209{
210  setBuffer(fd, SO_SNDBUF, size);
211}
212
213static void writePid(void)
214{
215  string fname=::arg()["socket-dir"]+"/"+s_programname+".pid";
216  ofstream of(fname.c_str());
217  if(of)
218    of<< getpid() <<endl;
219  else
220    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
221}
222
223void primeHints(void)
224{
225  // prime root cache
226  set<DNSResourceRecord>nsset;
227
228  if(::arg()["hint-file"].empty()) {
229    static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
230                       "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
231    DNSResourceRecord arr, nsrr;
232    arr.qtype=QType::A;
233    arr.ttl=time(0)+3600000;
234    nsrr.qtype=QType::NS;
235    nsrr.ttl=time(0)+3600000;
236   
237    for(char c='a';c<='m';++c) {
238      static char templ[40];
239      strncpy(templ,"a.root-servers.net.", sizeof(templ) - 1);
240      *templ=c;
241      arr.qname=nsrr.content=templ;
242      arr.content=ips[c-'a'];
243      set<DNSResourceRecord> aset;
244      aset.insert(arr);
245      RC.replace(string(templ), QType(QType::A), aset);
246     
247      nsset.insert(nsrr);
248    }
249  }
250  else {
251    ZoneParserTNG zpt(::arg()["hint-file"]);
252    DNSResourceRecord rr;
253    set<DNSResourceRecord> aset;
254
255    while(zpt.get(rr)) {
256      rr.ttl+=time(0);
257      if(rr.qtype.getCode()==QType::A) {
258        set<DNSResourceRecord> aset;
259        aset.insert(rr);
260        RC.replace(rr.qname, QType(QType::A), aset);
261      }
262      if(rr.qtype.getCode()==QType::NS) {
263        rr.content=toLower(rr.content);
264        nsset.insert(rr);
265      }
266    }
267  }
268  RC.replace(".", QType(QType::NS), nsset); // and stuff in the cache
269}
270
271void startDoResolve(void *p)
272{
273  try {
274    DNSComboWriter* dc=(DNSComboWriter *)p;
275
276    uint16_t maxudpsize=512;
277    MOADNSParser::EDNSOpts edo;
278    if(dc->d_mdp.getEDNSOpts(&edo)) {
279      maxudpsize=edo.d_packetsize;
280    }
281
282    vector<DNSResourceRecord> ret;
283   
284    vector<uint8_t> packet;
285    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
286
287    pw.getHeader()->aa=0;
288    pw.getHeader()->ra=1;
289    pw.getHeader()->qr=1;
290    pw.getHeader()->id=dc->d_mdp.d_header.id;
291    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
292
293    //    MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName());
294    SyncRes sr(dc->d_now);
295    if(!g_quiet)
296      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
297       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
298
299    sr.setId(MT->getTid());
300    if(!dc->d_mdp.d_header.rd)
301      sr.setCacheOnly();
302
303    int res=sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret);
304    if(res<0) {
305      pw.getHeader()->rcode=RCode::ServFail;
306      // no commit here, because no record
307      g_stats.servFails++;
308    }
309    else {
310      pw.getHeader()->rcode=res;
311      switch(res) {
312      case RCode::ServFail:
313        g_stats.servFails++;
314        break;
315      case RCode::NXDomain:
316        g_stats.nxDomains++;
317        break;
318      case RCode::NoError:
319        g_stats.noErrors++;
320        break;
321      }
322     
323      if(ret.size()) {
324        shuffle(ret);
325        for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i) {
326          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, 1, (DNSPacketWriter::Place)i->d_place);
327          shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), 1, i->content)); 
328          drc->toPacket(pw);
329          if(!dc->d_tcp && pw.size() > maxudpsize) {
330            pw.rollback();
331            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
332              pw.getHeader()->tc=1;
333            goto sendit; // need to jump over pw.commit
334          }
335        }
336        pw.commit();
337      }
338    }
339  sendit:;
340    if(!dc->d_tcp) {
341      sendto(dc->d_socket, &*packet.begin(), packet.size(), 0, (struct sockaddr *)(dc->d_remote), dc->d_socklen);
342    }
343    else {
344      char buf[2];
345      buf[0]=packet.size()/256;
346      buf[1]=packet.size()%256;
347
348      struct iovec iov[2];
349
350      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
351      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
352
353      int ret=writev(dc->d_socket, iov, 2);
354
355      if(ret <= 0 ) 
356        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< (ret ? strerror(errno) : "EOF") <<endl;
357      else if((unsigned int)ret != 2 + packet.size())
358        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
359    }
360
361    //    MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName());
362    if(!g_quiet) {
363      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
364      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
365        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
366    }
367   
368    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
369    float spent=makeFloat(sr.d_now-dc->d_now);
370    if(spent < 0.001)
371      g_stats.answers0_1++;
372    else if(spent < 0.010)
373      g_stats.answers1_10++;
374    else if(spent < 0.1)
375      g_stats.answers10_100++;
376    else if(spent < 1.0)
377      g_stats.answers100_1000++;
378    else
379      g_stats.answersSlow++;
380
381    uint64_t newLat=(uint64_t)(spent*1000000);
382    if(newLat < 1000000)  // outliers of several minutes exist..
383      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
384    delete dc;
385  }
386  catch(AhuException &ae) {
387    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
388  }
389  catch(exception& e) {
390    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
391  }
392  catch(...) {
393    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
394  }
395}
396
397RecursorControlChannel s_rcc;
398
399void makeControlChannelSocket()
400{
401  string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
402  if(::arg().mustDo("fork")) {
403    sockname+="."+lexical_cast<string>(getpid());
404    L<<Logger::Warning<<"Forked control socket name: "<<sockname<<endl;
405  }
406  s_rcc.listen(sockname);
407}
408
409void makeClientSocket()
410{
411  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
412  if(d_clientsock<0) 
413    throw AhuException("Making a socket for resolver: "+stringerror());
414  setReceiveBuffer(d_clientsock, 200000); 
415  struct sockaddr_in sin;
416  memset((char *)&sin,0, sizeof(sin));
417 
418  sin.sin_family = AF_INET;
419
420  if(!IpToU32(::arg()["query-local-address"], &sin.sin_addr.s_addr))
421    throw AhuException("Unable to resolve local address '"+ ::arg()["query-local-address"] +"'"); 
422
423  int tries=10;
424  while(--tries) {
425    uint16_t port=10000+Utility::random()%10000;
426    sin.sin_port = htons(port); 
427   
428    if (::bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
429      break;
430   
431  }
432  if(!tries)
433    throw AhuException("Resolver binding to local socket: "+stringerror());
434
435  Utility::setNonBlocking(d_clientsock);
436
437  L<<Logger::Error<<"Sending UDP queries from "<<inet_ntoa(sin.sin_addr)<<":"<< ntohs(sin.sin_port)  <<endl;
438}
439
440void makeTCPServerSockets()
441{
442  vector<string>locals;
443  stringtok(locals,::arg()["local-address"]," ,");
444
445  if(locals.empty())
446    throw AhuException("No local address specified");
447 
448  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
449    int fd=socket(AF_INET, SOCK_STREAM,0);
450    if(fd<0) 
451      throw AhuException("Making a server socket for resolver: "+stringerror());
452 
453    struct sockaddr_in sin;
454    memset((char *)&sin,0, sizeof(sin));
455   
456    sin.sin_family = AF_INET;
457    if(!IpToU32(*i, &sin.sin_addr.s_addr))
458      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
459
460    int tmp=1;
461    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
462      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
463      exit(1);
464    }
465   
466#ifdef TCP_DEFER_ACCEPT
467    if(setsockopt(fd,SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
468      L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
469    }
470#endif
471
472    sin.sin_port = htons(::arg().asNum("local-port")); 
473   
474    if (::bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
475      throw AhuException("Binding TCP server socket for "+*i+": "+stringerror());
476   
477    Utility::setNonBlocking(fd);
478    setSendBuffer(fd, 65000);
479    listen(fd, 128);
480    s_tcpserversocks.push_back(fd);
481    L<<Logger::Error<<"Listening for TCP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<::arg().asNum("local-port")<<endl;
482  }
483}
484
485void makeUDPServerSockets()
486{
487  vector<string>locals;
488  stringtok(locals,::arg()["local-address"]," ,");
489
490  if(locals.empty())
491    throw AhuException("No local address specified");
492 
493  if(::arg()["local-address"]=="0.0.0.0") {
494    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
495  }
496
497  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
498    int fd=socket(AF_INET, SOCK_DGRAM,0);
499    if(fd<0) 
500      throw AhuException("Making a server socket for resolver: "+stringerror());
501    setReceiveBuffer(fd, 200000);
502    struct sockaddr_in sin;
503    memset((char *)&sin,0, sizeof(sin));
504   
505    sin.sin_family = AF_INET;
506    if(!IpToU32(*i, &sin.sin_addr.s_addr))
507      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
508   
509    sin.sin_port = htons(::arg().asNum("local-port")); 
510   
511    if (::bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
512      throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror());
513   
514    Utility::setNonBlocking(fd);
515    d_udpserversocks.push_back(fd);
516    L<<Logger::Error<<"Listening for UDP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<::arg().asNum("local-port")<<endl;
517  }
518}
519
520
521#ifndef WIN32
522void daemonize(void)
523{
524  if(fork())
525    exit(0); // bye bye
526 
527  setsid(); 
528
529  // cleanup open fds, but skip sockets
530  close(0);
531  close(1);
532  close(2);
533}
534#endif
535
536uint64_t counter;
537bool statsWanted;
538
539
540void usr1Handler(int)
541{
542  statsWanted=true;
543}
544
545
546
547void usr2Handler(int)
548{
549  SyncRes::setLog(true);
550  g_quiet=false;
551  ::arg().set("quiet")="no";
552
553}
554
555void doStats(void)
556{
557  if(g_stats.qcounter) {
558    L<<Logger::Error<<"stats: "<<g_stats.qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
559     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits"<<endl;
560    L<<Logger::Error<<"stats: throttle map: "<<SyncRes::s_throttle.size()<<", ns speeds: "
561     <<SyncRes::s_nsSpeeds.size()<<endl; // ", bytes: "<<RC.bytes()<<endl;
562    L<<Logger::Error<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
563    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
564     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
565    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
566  }
567  else if(statsWanted) 
568    L<<Logger::Error<<"stats: no stats yet!"<<endl;
569
570  statsWanted=false;
571}
572
573static void houseKeeping(void *)
574{
575  static time_t last_stat, last_rootupdate, last_prune;
576  struct timeval now;
577  gettimeofday(&now, 0);
578
579  if(now.tv_sec - last_prune > 300) { 
580    DTime dt;
581    dt.setTimeval(now);
582    RC.doPrune();
583   
584    typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
585    negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(SyncRes::s_negcache);
586
587    negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
588    ttdindex.erase(ttdindex.begin(), i);
589
590    time_t limit=now.tv_sec-300;
591    for(SyncRes::nsspeeds_t::iterator i = SyncRes::s_nsSpeeds.begin() ; i!= SyncRes::s_nsSpeeds.end(); )
592      if(i->second.stale(limit))
593        SyncRes::s_nsSpeeds.erase(i++);
594      else
595        ++i;
596
597    //   cerr<<"Pruned "<<pruned<<" records, left "<<SyncRes::s_negcache.size()<<"\n";
598//    cout<<"Prune took "<<dt.udiff()<<"usec\n";
599    last_prune=time(0);
600  }
601  if(now.tv_sec - last_stat>1800) { 
602    doStats();
603    last_stat=time(0);
604  }
605  if(now.tv_sec -last_rootupdate>7200) {
606    SyncRes sr(now);
607    vector<DNSResourceRecord> ret;
608
609    sr.setNoCache();
610    int res=sr.beginResolve(".", QType(QType::NS), ret);
611    if(!res) {
612      L<<Logger::Error<<"Refreshed . records"<<endl;
613      last_rootupdate=now.tv_sec;
614    }
615    else
616      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
617  }
618}
619
620map<uint32_t, uint32_t> g_tcpClientCounts;
621struct TCPConnection
622{
623  int fd;
624  enum {BYTE0, BYTE1, GETQUESTION} state;
625  int qlen;
626  int bytesread;
627  struct sockaddr_in remote;
628  char data[65535];
629  time_t startTime;
630
631  void closeAndCleanup()
632  {
633    close(fd);
634    if(!g_tcpClientCounts[remote.sin_addr.s_addr]--) 
635      g_tcpClientCounts.erase(remote.sin_addr.s_addr);
636  }
637};
638
639#if 0
640#include <execinfo.h>
641
642  multimap<uint32_t,string> rev;
643  for(map<string,uint32_t>::const_iterator i=casesptr->begin(); i!=casesptr->end(); ++i) {
644    rev.insert(make_pair(i->second,i->first));
645  }
646  for(multimap<uint32_t,string>::const_iterator i=rev.begin(); i!= rev.end(); ++i)
647    cout<<i->first<<" times: \n"<<i->second<<"\n";
648
649  cout.flush();
650
651map<string,uint32_t>* casesptr;
652static string maketrace()
653{
654  void *array[20]; //only care about last 17 functions (3 taken with tracing support)
655  size_t size;
656  char **strings;
657  size_t i;
658
659  size = backtrace (array, 5);
660  strings = backtrace_symbols (array, size); //Need -rdynamic gcc (linker) flag for this to work
661
662  string ret;
663
664  for (i = 0; i < size; i++) //skip useless functions
665    ret+=string(strings[i])+"\n";
666  return ret;
667}
668
669extern "C" {
670
671int gettimeofday (struct timeval *__restrict __tv,
672                  __timezone_ptr_t __tz)
673{
674  static map<string, uint32_t> s_cases;
675  casesptr=&s_cases;
676  s_cases[maketrace()]++;
677  __tv->tv_sec=time(0);
678  return 0;
679}
680
681}
682#endif
683
684int main(int argc, char **argv) 
685{
686  reportBasicTypes();
687
688  int ret = EXIT_SUCCESS;
689#ifdef WIN32
690    WSADATA wsaData;
691    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
692#endif // WIN32
693
694  try {
695    Utility::srandom(time(0));
696    ::arg().set("soa-minimum-ttl","Don't change")="0";
697    ::arg().set("soa-serial-offset","Don't change")="0";
698    ::arg().set("no-shuffle","Don't change")="off";
699    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
700    ::arg().set("local-port","port to listen on")="53";
701    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas")="0.0.0.0";
702    ::arg().set("trace","if we should output heaps of logging")="off";
703    ::arg().set("daemon","Operate as a daemon")="yes";
704    ::arg().set("chroot","switch to chroot jail")="";
705    ::arg().set("setgid","If set, change group id to this gid for more security")="";
706    ::arg().set("setuid","If set, change user id to this uid for more security")="";
707    ::arg().set("quiet","Suppress logging of questions and answers")="true";
708    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
709    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
710    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
711    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
712    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
713    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
714    ::arg().set("hint-file", "If set, load root hints from this file")="";
715    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="0";
716    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")="";
717    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
718    ::arg().set("fork", "If set, fork the daemon for possible double performance")="no";
719
720    ::arg().setCmd("help","Provide a helpful message");
721    L.toConsole(Logger::Warning);
722    ::arg().laxParse(argc,argv); // do a lax parse
723
724    string configname=::arg()["config-dir"]+"/recursor.conf";
725    cleanSlashes(configname);
726
727    if(!::arg().file(configname.c_str())) 
728      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
729
730    ::arg().parse(argc,argv);
731
732    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
733
734    if(::arg().mustDo("help")) {
735      cerr<<"syntax:"<<endl<<endl;
736      cerr<<::arg().helpstring(::arg()["help"])<<endl;
737      exit(99);
738    }
739
740    if(!::arg()["allow-from"].empty()) {
741      g_allowFrom=new NetmaskGroup;
742      vector<string> ips;
743      stringtok(ips, ::arg()["allow-from"], ", ");
744      for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i)
745        g_allowFrom->addMask(*i);
746    }
747
748    L.setName("pdns_recursor");
749
750    L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2006 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
751#ifdef __GNUC__
752    L<<", gcc "__VERSION__;
753#endif // add other compilers here
754    L<<") starting up"<<endl;
755
756    L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
757  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
758    "This is free software, and you are welcome to redistribute it "
759    "according to the terms of the GPL version 2."<<endl;
760
761
762  g_quiet=::arg().mustDo("quiet");
763  if(::arg().mustDo("trace")) {
764      SyncRes::setLog(true);
765      ::arg().set("quiet")="no";
766      g_quiet=false;
767  }
768
769    makeUDPServerSockets();
770    makeTCPServerSockets();
771
772    if(::arg().mustDo("fork")) {
773      fork();
774      L<<Logger::Warning<<"This is forked pid "<<getpid()<<endl;
775    }
776    makeClientSocket();
777
778    makeControlChannelSocket();
779   
780    MT=new MTasker<PacketID,string>(100000);
781
782    char data[1500];
783    struct sockaddr_in fromaddr;
784   
785    PacketID pident;
786    primeHints();   
787    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
788#ifndef WIN32
789    if(::arg().mustDo("daemon")) {
790      L.toConsole(Logger::Critical);
791      daemonize();
792    }
793    signal(SIGUSR1,usr1Handler);
794    signal(SIGUSR2,usr2Handler);
795    signal(SIGPIPE,SIG_IGN);
796
797    writePid();
798#endif
799
800    int newgid=0;
801    if(!::arg()["setgid"].empty())
802      newgid=Utility::makeGidNumeric(::arg()["setgid"]);
803    int newuid=0;
804    if(!::arg()["setuid"].empty())
805      newuid=Utility::makeUidNumeric(::arg()["setuid"]);
806
807
808    if (!::arg()["chroot"].empty()) {
809        if (chroot(::arg()["chroot"].c_str())<0) {
810            L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
811            exit(1);
812        }
813    }
814
815    Utility::dropPrivs(newuid, newgid);
816
817    vector<TCPConnection> tcpconnections;
818    counter=0;
819    struct timeval now;
820    unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
821    int tcpTimeout=::arg().asNum("client-tcp-timeout");
822
823    unsigned int maxTCPPerClient=::arg().asNum("max-tcp-per-client");
824
825    for(;;) {
826      while(MT->schedule()); // housekeeping, let threads do their thing
827     
828      if(!((counter++)%500)) 
829        MT->makeThread(houseKeeping,0);
830      if(statsWanted) {
831        doStats();
832      }
833
834      Utility::socklen_t addrlen=sizeof(fromaddr);
835      int d_len;
836     
837      struct timeval tv;
838      tv.tv_sec=0;
839      tv.tv_usec=500000;
840     
841      fd_set readfds, writefds;
842      FD_ZERO( &readfds );
843      FD_ZERO( &writefds );
844      FD_SET( d_clientsock, &readfds );
845      FD_SET( s_rcc.d_fd, &readfds);
846      int fdmax=max(d_clientsock, s_rcc.d_fd);
847
848      if(!tcpconnections.empty())
849        gettimeofday(&now, 0);
850
851      vector<TCPConnection> sweeped;
852
853      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
854        if(now.tv_sec < i->startTime + tcpTimeout) {
855          FD_SET(i->fd, &readfds);
856          fdmax=max(fdmax,i->fd);
857          sweeped.push_back(*i);
858        }
859        else {
860          L<<Logger::Error<<"TCP timeout from client "<<inet_ntoa(i->remote.sin_addr)<<endl;
861          i->closeAndCleanup();
862        }
863      }
864      sweeped.swap(tcpconnections);
865
866      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
867        FD_SET( *i, &readfds );
868        fdmax=max(fdmax,*i);
869      }
870      if(tcpconnections.size() < maxTcpClients) 
871        for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) {
872          FD_SET(*i, &readfds );
873          fdmax=max(fdmax,*i);
874        }
875
876      for(map<int,PacketID>::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) {
877        // cerr<<"Adding TCP socket "<<i->first<<" to read select set"<<endl;
878        FD_SET( i->first, &readfds );
879        fdmax=max(fdmax,i->first);
880      }
881
882      for(map<int,PacketID>::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) {
883        // cerr<<"Adding TCP socket "<<i->first<<" to write select set"<<endl;
884        FD_SET( i->first, &writefds );
885        fdmax=max(fdmax,i->first);
886      }
887
888      int selret = select(  fdmax + 1, &readfds, &writefds, NULL, &tv );
889      gettimeofday(&now, 0);
890      if(selret<=0) 
891        if (selret == -1 && errno!=EINTR) 
892          throw AhuException("Select returned: "+stringerror());
893        else
894          continue;
895
896      if(FD_ISSET(s_rcc.d_fd, &readfds)) {
897        string remote;
898        string msg=s_rcc.recv(&remote);
899        RecursorControlParser rcp;
900        RecursorControlParser::func_t* command;
901        string answer=rcp.getAnswer(msg, &command);
902        s_rcc.send(answer, &remote);
903        command();
904      }
905
906      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a UDP question response?
907        while((d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
908        try {
909            DNSComboWriter dc(data, d_len, now);
910            dc.setRemote((struct sockaddr *)&fromaddr, addrlen);
911
912            if(dc.d_mdp.d_header.qr) {
913              pident.remote=fromaddr;
914              pident.id=dc.d_mdp.d_header.id;
915              string packet;
916              packet.assign(data, d_len);
917              MT->sendEvent(pident, &packet);
918            }
919            else 
920              L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<dc.getRemote()<<endl;
921          }
922          catch(MOADNSException& mde) {
923            L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<": "<<mde.what()<<endl;
924          }
925        }
926      }
927     
928      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
929        if(FD_ISSET(*i,&readfds)) { // do we have a new question on udp?
930          while((d_len=recvfrom(*i, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
931            g_stats.queryrate.pulse(now);
932            if(g_allowFrom && !g_allowFrom->match(&fromaddr)) {
933              g_stats.unauthorizedUDP++;
934              continue;
935            }
936
937            try {
938              DNSComboWriter* dc = new DNSComboWriter(data, d_len, now);
939
940              dc->setRemote((struct sockaddr *)&fromaddr, addrlen);
941
942              if(dc->d_mdp.d_header.qr)
943                L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
944              else {
945                ++g_stats.qcounter;
946                dc->setSocket(*i);
947                dc->d_tcp=false;
948                MT->makeThread(startDoResolve, (void*) dc);
949              }
950            }
951            catch(MOADNSException& mde) {
952              L<<Logger::Error<<"Unable to parse packet from remote UDP client "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<": "<<mde.what()<<endl;
953            }
954          }
955        }
956      }
957
958      for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) { 
959        if(FD_ISSET(*i ,&readfds)) { // do we have a new TCP connection?
960          struct sockaddr_in addr;
961          socklen_t addrlen=sizeof(addr);
962          int newsock=accept(*i, (struct sockaddr*)&addr, &addrlen);
963          if(newsock>0) {
964            if(g_allowFrom && !g_allowFrom->match(&addr)) {
965              g_stats.unauthorizedTCP++;
966              close(newsock);
967              continue;
968            }
969
970            if(maxTCPPerClient && g_tcpClientCounts.count(addr.sin_addr.s_addr) && g_tcpClientCounts[addr.sin_addr.s_addr] >= maxTCPPerClient) {
971              g_stats.tcpClientOverflow++;
972              close(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
973              continue;
974            }
975            g_tcpClientCounts[addr.sin_addr.s_addr]++;
976            Utility::setNonBlocking(newsock);
977            TCPConnection tc;
978            tc.fd=newsock;
979            tc.state=TCPConnection::BYTE0;
980            tc.remote=addr;
981            tc.startTime=now.tv_sec;
982            tcpconnections.push_back(tc);
983          }
984        }
985      }
986
987      // have any question answers come in over TCP?
988      for(map<int,PacketID>::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) { 
989        bool haveErased=false;
990        if(FD_ISSET(i->first, &readfds)) { // can we receive
991          shared_array<char> buffer(new char[i->second.inNeeded]);
992
993          int ret=read(i->first, buffer.get(), min(i->second.inNeeded,200));
994          // cerr<<"Read returned "<<ret<<endl;
995          if(ret > 0) {
996            i->second.inMSG.append(&buffer[0], &buffer[ret]);
997            i->second.inNeeded-=ret;
998            if(!i->second.inNeeded) {
999              // cerr<<"Got entire load of "<<i->second.inMSG.size()<<" bytes"<<endl;
1000              PacketID pid=i->second;
1001              string msg=i->second.inMSG;
1002             
1003              d_tcpclientreadsocks.erase((i++));
1004              haveErased=true;
1005              MT->sendEvent(pid, &msg);   // XXX DODGY
1006            }
1007            else {
1008              // cerr<<"Still have "<<i->second.inNeeded<<" left to go"<<endl;
1009            }
1010          }
1011          else {
1012            //      cerr<<"when reading ret="<<ret<<endl;
1013            // XXX FIXME I think some stuff needs to happen here - like send an EOF event
1014          }
1015        }
1016        if(!haveErased)
1017          ++i;
1018      }
1019     
1020      // is there data we can send to remote nameservers over TCP?
1021      for(map<int,PacketID>::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) { 
1022        bool haveErased=false;
1023        if(FD_ISSET(i->first, &writefds)) { // can we send over TCP
1024          // cerr<<"Socket "<<i->first<<" available for writing"<<endl;
1025          int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos);
1026          if(ret > 0) {
1027            i->second.outPos+=ret;
1028            if(i->second.outPos==i->second.outMSG.size()) {
1029              // cerr<<"Sent out entire load of "<<i->second.outMSG.size()<<" bytes"<<endl;
1030              PacketID pid=i->second;
1031              d_tcpclientwritesocks.erase(i++);   // erase!
1032              haveErased=true;
1033              MT->sendEvent(pid, 0);
1034            }
1035
1036          }
1037          else { 
1038            //      cerr<<"ret="<<ret<<" when writing"<<endl;
1039            // XXX FIXME I think some stuff needs to happen here - like send an EOF event
1040          }
1041        }
1042        if(!haveErased)
1043          ++i;
1044      }
1045     
1046      // very braindead TCP incoming question parser
1047      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
1048        if(FD_ISSET(i->fd, &readfds)) {
1049          if(i->state==TCPConnection::BYTE0) {
1050            int bytes=read(i->fd,i->data,2);
1051            if(bytes==1)
1052              i->state=TCPConnection::BYTE1;
1053            if(bytes==2) { 
1054              i->qlen=(i->data[0]<<8)+i->data[1];
1055              i->bytesread=0;
1056              i->state=TCPConnection::GETQUESTION;
1057            }
1058            if(!bytes || bytes < 0) {
1059              i->closeAndCleanup();
1060              tcpconnections.erase(i);
1061              break;
1062            }
1063          }
1064          else if(i->state==TCPConnection::BYTE1) {
1065            int bytes=read(i->fd,i->data+1,1);
1066            if(bytes==1) {
1067              i->state=TCPConnection::GETQUESTION;
1068              i->qlen=(i->data[0]<<8)+i->data[1];
1069              i->bytesread=0;
1070            }
1071            if(!bytes || bytes < 0) {
1072              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
1073              i->closeAndCleanup();
1074              tcpconnections.erase(i);
1075              break;
1076            }
1077           
1078          }
1079          else if(i->state==TCPConnection::GETQUESTION) {
1080            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
1081            if(!bytes || bytes < 0) {
1082              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
1083              i->closeAndCleanup();
1084              tcpconnections.erase(i);
1085              break;
1086            }
1087            i->bytesread+=bytes;
1088            if(i->bytesread==i->qlen) {
1089              i->state=TCPConnection::BYTE0;
1090              DNSComboWriter* dc=0;
1091              try {
1092                dc=new DNSComboWriter(i->data, i->qlen, now);
1093              }
1094              catch(MOADNSException &mde) {
1095                L<<Logger::Error<<"Unable to parse packet from remote TCP client "<<sockAddrToString(&i->remote,sizeof(i->remote))<<endl;
1096                i->closeAndCleanup();
1097                tcpconnections.erase(i);
1098                break;
1099              }
1100
1101              dc->setSocket(i->fd);
1102              dc->d_tcp=true;
1103              dc->setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
1104              if(dc->d_mdp.d_header.qr)
1105                L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
1106              else {
1107                ++g_stats.qcounter;
1108                ++g_stats.tcpqcounter;
1109                MT->makeThread(startDoResolve, dc);
1110              }
1111            }
1112          }
1113        }
1114      }
1115    }
1116  }
1117  catch(AhuException &ae) {
1118    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1119    ret=EXIT_FAILURE;
1120  }
1121  catch(exception &e) {
1122    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1123    ret=EXIT_FAILURE;
1124  }
1125  catch(...) {
1126    L<<Logger::Error<<"any other exception in main: "<<endl;
1127    ret=EXIT_FAILURE;
1128  }
1129 
1130#ifdef WIN32
1131  WSACleanup();
1132#endif // WIN32
1133
1134  return ret;
1135}
Note: See TracBrowser for help on using the browser.