root/trunk/pdns/pdns/pdns_recursor.cc @ 439

Revision 439, 24.7 KB (checked in by ahu, 8 years ago)

simplified TCP outgoing query code in attempt to smoke out crasher bug

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2005  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17*/
18
19#include "utility.hh"
20#include <iostream>
21#include <errno.h>
22#include <map>
23#include <set>
24#ifndef WIN32
25#include <netdb.h>
26#endif // WIN32
27#include <stdio.h>
28#include <signal.h>
29#include <stdlib.h>
30#include <unistd.h>
31#include "mtasker.hh"
32#include <utility>
33#include "dnspacket.hh"
34#include "statbag.hh"
35#include "arguments.hh"
36#include "syncres.hh"
37#include <fcntl.h>
38#include <fstream>
39#include "sstuff.hh"
40#include <boost/tuple/tuple.hpp>
41#include <boost/tuple/tuple_comparison.hpp>
42#include <boost/shared_array.hpp>
43
44using namespace boost;
45
46#include "recursor_cache.hh"
47
48#ifdef __FreeBSD__           // see cvstrac ticket #26
49#include <pthread.h>
50#include <semaphore.h>
51#endif
52
53MemRecursorCache RC;
54
55string s_programname="pdns_recursor";
56
57#ifndef WIN32
58#ifndef __FreeBSD__
59extern "C" {
60  int sem_init(sem_t*, int, unsigned int){return 0;}
61  int sem_wait(sem_t*){return 0;}
62  int sem_trywait(sem_t*){return 0;}
63  int sem_post(sem_t*){return 0;}
64  int sem_getvalue(sem_t*, int*){return 0;}
65  pthread_t pthread_self(void){return (pthread_t) 0;}
66  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
67  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
68  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
69
70}
71#endif // __FreeBSD__
72#endif // WIN32
73
74StatBag S;
75ArgvMap &arg()
76{
77  static ArgvMap theArg;
78  return theArg;
79}
80static int d_clientsock;
81static vector<int> d_udpserversocks;
82
83typedef vector<int> tcpserversocks_t;
84static tcpserversocks_t s_tcpserversocks;
85
86struct PacketID
87{
88  PacketID() : sock(0), inNeeded(0), outPos(0)
89  {}
90
91  u_int16_t id;  // wait for a specific id/remote paie
92  struct sockaddr_in remote;  // this is the remote
93
94  Socket* sock;  // or wait for an event on a TCP fd
95  int inNeeded; // if this is set, we'll read until inNeeded bytes are read
96  string inMSG; // they'll go here
97
98  string outMSG; // the outgoing message that needs to be sent
99  int outPos;    // how far we are along in the outMSG
100
101  bool operator<(const PacketID& b) const
102  {
103    int ourSock= sock ? sock->getHandle() : 0;
104    int bSock = b.sock ? b.sock->getHandle() : 0;
105    return 
106      tie(id, remote.sin_addr.s_addr, remote.sin_port, ourSock) <
107      tie(b.id, b.remote.sin_addr.s_addr, b.remote.sin_port, bSock);
108  }
109};
110
111static map<int,PacketID> d_tcpclientreadsocks, d_tcpclientwritesocks;
112
113MTasker<PacketID,string>* MT;
114
115int asendtcp(const string& data, Socket* sock) 
116{
117  PacketID pident;
118  pident.sock=sock;
119  pident.outMSG=data;
120  string packet;
121
122  //  cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
123  d_tcpclientwritesocks[sock->getHandle()]=pident;
124
125  if(!MT->waitEvent(pident,&packet,1)) { // timeout
126    d_tcpclientwritesocks.erase(sock->getHandle());
127    return 0; 
128  }
129  //  cerr<<"asendtcp happy"<<endl;
130  return 1;
131}
132
133int arecvtcp(string& data, int len, Socket* sock) 
134{
135  data="";
136  PacketID pident;
137  pident.sock=sock;
138  pident.inNeeded=len;
139
140  // cerr<<"arecvtcp called for "<<len<<" bytes"<<endl;
141  // cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
142  d_tcpclientreadsocks[sock->getHandle()]=pident;
143
144  if(!MT->waitEvent(pident,&data,1)) { // timeout
145    d_tcpclientreadsocks.erase(sock->getHandle());
146    return 0; 
147  }
148
149  // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl;
150  return 1;
151}
152
153
154/* these two functions are used by LWRes */
155int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
156{
157  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
158}
159
160int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
161{
162  PacketID pident;
163  pident.id=id;
164  memcpy(&pident.remote,toaddr,sizeof(pident.remote));
165
166  string packet;
167  if(!MT->waitEvent(pident,&packet,1)) { // timeout
168    return 0; 
169  }
170
171  *d_len=packet.size();
172  memcpy(data,packet.c_str(),min(len,*d_len));
173
174  return 1;
175}
176
177
178static void writePid(void)
179{
180  string fname=arg()["socket-dir"]+"/"+s_programname+".pid";
181  ofstream of(fname.c_str());
182  if(of)
183    of<<getpid()<<endl;
184  else
185    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
186}
187
188void primeHints(void)
189{
190  // prime root cache
191
192  static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
193                     "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
194  DNSResourceRecord arr, nsrr;
195  arr.qtype=QType::A;
196  arr.ttl=time(0)+3600000;
197  nsrr.qtype=QType::NS;
198  nsrr.ttl=time(0)+3600000;
199 
200  set<DNSResourceRecord>nsset;
201  for(char c='a';c<='m';++c) {
202    static char templ[40];
203    strncpy(templ,"a.root-servers.net", sizeof(templ) - 1);
204    *templ=c;
205    arr.qname=nsrr.content=templ;
206    arr.content=ips[c-'a'];
207    set<DNSResourceRecord>aset;
208    aset.insert(arr);
209    RC.replace(string(templ),QType(QType::A),aset);
210
211    nsset.insert(nsrr);
212  }
213  RC.replace("",QType(QType::NS),nsset); // and stuff in the cache
214}
215
216void startDoResolve(void *p)
217{
218  try {
219    bool quiet=arg().mustDo("quiet");
220    DNSPacket P=*(DNSPacket *)p;
221
222    delete (DNSPacket *)p;
223
224    vector<DNSResourceRecord>ret;
225    DNSPacket *R=P.replyPacket();
226    R->setA(false);
227    R->setRA(true);
228
229    SyncRes sr;
230    if(!quiet)
231      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (R->d_tcp ? "TCP " : "") << "question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
232
233    sr.setId(MT->getTid());
234    if(!P.d.rd)
235      sr.setCacheOnly();
236
237    int res=sr.beginResolve(P.qdomain, P.qtype, ret);
238    if(res<0)
239      R->setRcode(RCode::ServFail);
240    else {
241      R->setRcode(res);
242      for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i)
243        R->addRecord(*i);
244    }
245
246    const char *buffer=R->getData();
247
248    if(!R->d_tcp) {
249      if(R->len > 512) 
250        R->truncate(512);
251
252      sendto(R->getSocket(),buffer,R->len,0,(struct sockaddr *)(R->remote),R->d_socklen);
253    }
254    else {
255      char buf[2];
256      buf[0]=R->len/256;
257      buf[1]=R->len%256;
258
259      struct iovec iov[2];
260
261      iov[0].iov_base=(void*)buf;    iov[0].iov_len=2;
262      iov[1].iov_base=(void*)buffer; iov[1].iov_len = R->len;
263
264      int ret=writev(R->getSocket(), iov, 2);
265
266      if(ret <= 0 ) 
267        L<<Logger::Error<<"Error writing TCP answer to "<<P.getRemote()<<": "<< (ret ? strerror(errno) : "EOF") <<endl;
268      else if(ret != 2 + R->len)
269        L<<Logger::Error<<"Oops, partial answer sent to "<<P.getRemote()<<" - probably would have trouble receiving our answer anyhow (size="<<R->len<<")"<<endl;
270
271      //      if(write(R->getSocket(),buf,2)!=2 || write(R->getSocket(),buffer,R->len)!=R->len)
272      //  XXX FIXME write this writev fallback otherwise
273    }
274
275    if(!quiet) {
276      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
277      L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
278        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
279    }
280   
281    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
282
283    delete R;
284  }
285  catch(AhuException &ae) {
286    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
287  }
288  catch(...) {
289    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
290  }
291}
292
293void makeClientSocket()
294{
295  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
296  if(d_clientsock<0) 
297    throw AhuException("Making a socket for resolver: "+stringerror());
298 
299  struct sockaddr_in sin;
300  memset((char *)&sin,0, sizeof(sin));
301 
302  sin.sin_family = AF_INET;
303
304  if(!IpToU32(arg()["query-local-address"], &sin.sin_addr.s_addr))
305    throw AhuException("Unable to resolve local address '"+ arg()["query-local-address"] +"'"); 
306
307  int tries=10;
308  while(--tries) {
309    u_int16_t port=10000+Utility::random()%10000;
310    sin.sin_port = htons(port); 
311   
312    if (bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
313      break;
314   
315  }
316  if(!tries)
317    throw AhuException("Resolver binding to local socket: "+stringerror());
318
319  Utility::setNonBlocking(d_clientsock);
320  L<<Logger::Error<<"Sending UDP queries from "<<inet_ntoa(sin.sin_addr)<<":"<< ntohs(sin.sin_port)  <<endl;
321}
322
323void makeTCPServerSockets()
324{
325  vector<string>locals;
326  stringtok(locals,arg()["local-address"]," ,");
327
328  if(locals.empty())
329    throw AhuException("No local address specified");
330 
331  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
332    int fd=socket(AF_INET, SOCK_STREAM,0);
333    if(fd<0) 
334      throw AhuException("Making a server socket for resolver: "+stringerror());
335 
336    struct sockaddr_in sin;
337    memset((char *)&sin,0, sizeof(sin));
338   
339    sin.sin_family = AF_INET;
340    if(!IpToU32(*i, &sin.sin_addr.s_addr))
341      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
342
343    int tmp=1;
344    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
345      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
346      exit(1); 
347    }
348   
349    sin.sin_port = htons(arg().asNum("local-port")); 
350   
351    if (bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
352      throw AhuException("Binding TCP server socket for "+*i+": "+stringerror());
353   
354    Utility::setNonBlocking(fd);
355    listen(fd, 128);
356    s_tcpserversocks.push_back(fd);
357    L<<Logger::Error<<"Listening for TCP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<arg().asNum("local-port")<<endl;
358  }
359}
360
361void makeUDPServerSockets()
362{
363  vector<string>locals;
364  stringtok(locals,arg()["local-address"]," ,");
365
366  if(locals.empty())
367    throw AhuException("No local address specified");
368 
369  if(arg()["local-address"]=="0.0.0.0") {
370    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
371  }
372
373  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
374    int fd=socket(AF_INET, SOCK_DGRAM,0);
375    if(fd<0) 
376      throw AhuException("Making a server socket for resolver: "+stringerror());
377 
378    struct sockaddr_in sin;
379    memset((char *)&sin,0, sizeof(sin));
380   
381    sin.sin_family = AF_INET;
382    if(!IpToU32(*i, &sin.sin_addr.s_addr))
383      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
384   
385    sin.sin_port = htons(arg().asNum("local-port")); 
386   
387    if (bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
388      throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror());
389   
390    Utility::setNonBlocking(fd);
391    d_udpserversocks.push_back(fd);
392    L<<Logger::Error<<"Listening for UDP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<arg().asNum("local-port")<<endl;
393  }
394}
395
396
397#ifndef WIN32
398void daemonize(void)
399{
400  if(fork())
401    exit(0); // bye bye
402 
403  setsid(); 
404
405  // cleanup open fds, but skip sockets
406  close(0);
407  close(1);
408  close(2);
409
410}
411#endif
412
413int counter, qcounter;
414bool statsWanted;
415
416void usr1Handler(int)
417{
418  statsWanted=true;
419}
420
421
422void doStats(void)
423{
424  if(qcounter) {
425    L<<Logger::Error<<"stats: "<<qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
426     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits";
427    L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
428    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
429     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
430    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
431  }
432  else if(statsWanted) 
433    L<<Logger::Error<<"stats: no stats yet!"<<endl;
434
435  statsWanted=false;
436}
437
438void houseKeeping(void *)
439{
440  static time_t last_stat, last_rootupdate, last_prune;
441  time_t now=time(0);
442  if(now - last_prune > 60) { 
443    RC.doPrune();
444    last_prune=time(0);
445  }
446  if(now - last_stat>1800) { 
447    doStats();
448    last_stat=time(0);
449  }
450  if(now -last_rootupdate>7200) {
451    SyncRes sr;
452    vector<DNSResourceRecord>ret;
453
454    sr.setNoCache();
455    int res=sr.beginResolve("", QType(QType::NS), ret);
456    if(!res) {
457      L<<Logger::Error<<"Refreshed . records"<<endl;
458      last_rootupdate=now;
459    }
460    else
461      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
462  }
463}
464
465struct TCPConnection
466{
467  int fd;
468  enum {BYTE0, BYTE1, GETQUESTION} state;
469  int qlen;
470  int bytesread;
471  struct sockaddr_in remote;
472  char data[65535];
473  time_t startTime;
474};
475
476int main(int argc, char **argv) 
477{
478  int ret = EXIT_SUCCESS;
479#ifdef WIN32
480    WSADATA wsaData;
481    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
482#endif // WIN32
483
484  try {
485    Utility::srandom(time(0));
486    arg().set("soa-minimum-ttl","Don't change")="0";
487    arg().set("soa-serial-offset","Don't change")="0";
488    arg().set("no-shuffle","Don't change")="off";
489    arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
490    arg().set("local-port","port to listen on")="53";
491    arg().set("local-address","IP addresses to listen on, separated by spaces or commas")="0.0.0.0";
492    arg().set("trace","if we should output heaps of logging")="off";
493    arg().set("daemon","Operate as a daemon")="yes";
494    arg().set("chroot","switch to chroot jail")="";
495    arg().set("setgid","If set, change group id to this gid for more security")="";
496    arg().set("setuid","If set, change user id to this uid for more security")="";
497    arg().set("quiet","Suppress logging of questions and answers")="off";
498    arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
499    arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
500    arg().set("delegation-only","Which domains we only accept delegations from")="";
501    arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
502    arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
503    arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
504
505    arg().setCmd("help","Provide a helpful message");
506    L.toConsole(Logger::Warning);
507    arg().laxParse(argc,argv); // do a lax parse
508
509    string configname=arg()["config-dir"]+"/recursor.conf";
510    cleanSlashes(configname);
511
512    if(!arg().file(configname.c_str())) 
513      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
514
515    arg().parse(argc,argv);
516
517    arg().set("delegation-only")=toLower(arg()["delegation-only"]);
518
519    if(arg().mustDo("help")) {
520      cerr<<"syntax:"<<endl<<endl;
521      cerr<<arg().helpstring(arg()["help"])<<endl;
522      exit(99);
523    }
524
525    L.setName("pdns_recursor");
526
527    L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2005 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
528#ifdef __GNUC__
529    L<<", gcc "__VERSION__;
530#endif // add other compilers here
531    L<<") starting up"<<endl;
532
533  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
534    "This is free software, and you are welcome to redistribute it "
535    "according to the terms of the GPL version 2."<<endl;
536
537
538    if(arg().mustDo("trace"))
539      SyncRes::setLog(true);
540   
541    makeClientSocket();
542    makeUDPServerSockets();
543    makeTCPServerSockets();
544       
545    MT=new MTasker<PacketID,string>(100000);
546
547    char data[1500];
548    struct sockaddr_in fromaddr;
549   
550    PacketID pident;
551    primeHints();   
552    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
553#ifndef WIN32
554    if(arg().mustDo("daemon")) {
555      L.toConsole(Logger::Critical);
556      daemonize();
557    }
558    signal(SIGUSR1,usr1Handler);
559
560    writePid();
561#endif
562
563    int newgid=0;
564    if(!arg()["setgid"].empty())
565      newgid=Utility::makeGidNumeric(arg()["setgid"]);
566    int newuid=0;
567    if(!arg()["setuid"].empty())
568      newuid=Utility::makeUidNumeric(arg()["setuid"]);
569
570
571    if (!arg()["chroot"].empty()) {
572        if (chroot(arg()["chroot"].c_str())<0) {
573            L<<Logger::Error<<"Unable to chroot to '"+arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
574            exit(1);
575        }
576    }
577
578    Utility::dropPrivs(newuid, newgid);
579
580    vector<TCPConnection> tcpconnections;
581    counter=0;
582    time_t now;
583    unsigned int maxTcpClients=arg().asNum("max-tcp-clients");
584    for(;;) {
585      while(MT->schedule()); // housekeeping, let threads do their thing
586     
587      if(!((counter++)%100)) 
588        MT->makeThread(houseKeeping,0);
589      if(statsWanted)
590        doStats();
591
592      Utility::socklen_t addrlen=sizeof(fromaddr);
593      int d_len;
594      DNSPacket P;
595     
596      struct timeval tv;
597      tv.tv_sec=0;
598      tv.tv_usec=500000;
599     
600      fd_set readfds, writefds;
601      FD_ZERO( &readfds );
602      FD_ZERO( &writefds );
603      FD_SET( d_clientsock, &readfds );
604      int fdmax=d_clientsock;
605
606      if(!tcpconnections.empty())
607        now=time(0);
608
609      vector<TCPConnection> sweeped;
610      int tcpLimit=arg().asNum("client-tcp-timeout");
611      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
612        if(now < i->startTime + tcpLimit) {
613          FD_SET(i->fd, &readfds);
614          fdmax=max(fdmax,i->fd);
615          sweeped.push_back(*i);
616        }
617        else {
618          L<<Logger::Error<<"TCP timeout from client "<<inet_ntoa(i->remote.sin_addr)<<endl;
619          close(i->fd);
620        }
621      }
622      sweeped.swap(tcpconnections);
623
624      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
625        FD_SET( *i, &readfds );
626        fdmax=max(fdmax,*i);
627      }
628      if(tcpconnections.size() < maxTcpClients) 
629        for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) {
630          FD_SET(*i, &readfds );
631          fdmax=max(fdmax,*i);
632        }
633
634      for(map<int,PacketID>::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) {
635        // cerr<<"Adding TCP socket "<<i->first<<" to read select set"<<endl;
636        FD_SET( i->first, &readfds );
637        fdmax=max(fdmax,i->first);
638      }
639
640      for(map<int,PacketID>::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) {
641        // cerr<<"Adding TCP socket "<<i->first<<" to write select set"<<endl;
642        FD_SET( i->first, &writefds );
643        fdmax=max(fdmax,i->first);
644      }
645
646      int selret = select(  fdmax + 1, &readfds, &writefds, NULL, &tv );
647      if(selret<=0) 
648        if (selret == -1 && errno!=EINTR) 
649          throw AhuException("Select returned: "+stringerror());
650        else
651          continue;
652
653      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a question response?
654        d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
655        if(d_len<0) 
656          continue;
657       
658        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
659        if(P.parse(data,d_len)<0) {
660          L<<Logger::Error<<"Unparseable packet from remote server "<<P.getRemote()<<endl;
661        }
662        else { 
663          if(P.d.qr) {
664            pident.remote=fromaddr;
665            pident.id=P.d.id;
666            string packet;
667            packet.assign(data,d_len);
668            MT->sendEvent(pident,&packet);
669          }
670          else 
671            L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<P.getRemote()<<endl;
672        }
673      }
674     
675      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
676        if(FD_ISSET(*i,&readfds)) { // do we have a new question on udp?
677          d_len=recvfrom(*i, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
678          if(d_len<0) 
679            continue;
680          P.setRemote((struct sockaddr *)&fromaddr, addrlen);
681          if(P.parse(data,d_len)<0) {
682            L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
683          }
684          else { 
685            if(P.d.qr)
686              L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
687            else {
688              ++qcounter;
689              P.setSocket(*i);
690              P.d_tcp=false;
691              MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
692            }
693          }
694        }
695      }
696
697      for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) { 
698        if(FD_ISSET(*i ,&readfds)) { // do we have a new TCP connection
699          struct sockaddr_in addr;
700          socklen_t addrlen=sizeof(addr);
701          int newsock=accept(*i, (struct sockaddr*)&addr, &addrlen);
702         
703          if(newsock>0) {
704            Utility::setNonBlocking(newsock);
705            TCPConnection tc;
706            tc.fd=newsock;
707            tc.state=TCPConnection::BYTE0;
708            tc.remote=addr;
709            tc.startTime=time(0);
710            tcpconnections.push_back(tc);
711          }
712        }
713      }
714
715      for(map<int,PacketID>::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) { 
716        bool haveErased=false;
717        if(FD_ISSET(i->first, &readfds)) { // can we receive
718          shared_array<char> buffer(new char[i->second.inNeeded]);
719
720          int ret=read(i->first, buffer.get(), min(i->second.inNeeded,200));
721          // cerr<<"Read returned "<<ret<<endl;
722          if(ret > 0) {
723            i->second.inMSG.append(&buffer[0], &buffer[ret]);
724            i->second.inNeeded-=ret;
725            if(!i->second.inNeeded) {
726              // cerr<<"Got entire load of "<<i->second.inMSG.size()<<" bytes"<<endl;
727              PacketID pid=i->second;
728              string msg=i->second.inMSG;
729             
730              d_tcpclientreadsocks.erase((i++));
731              haveErased=true;
732              MT->sendEvent(pid, &msg);   // XXX DODGY
733            }
734            else {
735              // cerr<<"Still have "<<i->second.inNeeded<<" left to go"<<endl;
736            }
737          }
738          else {
739            cerr<<"when reading ret="<<ret<<endl;
740          }
741        }
742        if(!haveErased)
743          ++i;
744      }
745
746      for(map<int,PacketID>::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) { 
747        bool haveErased=false;
748        if(FD_ISSET(i->first, &writefds)) { // can we send over TCP
749          // cerr<<"Socket "<<i->first<<" available for writing"<<endl;
750          int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos);
751          if(ret > 0) {
752            i->second.outPos+=ret;
753            if(i->second.outPos==i->second.outMSG.size()) {
754              // cerr<<"Sent out entire load of "<<i->second.outMSG.size()<<" bytes"<<endl;
755              PacketID pid=i->second;
756              d_tcpclientwritesocks.erase((i++));
757              MT->sendEvent(pid, 0);
758              haveErased=true;
759              // cerr<<"Sent event too"<<endl;
760            }
761
762          }
763          else { 
764            cerr<<"ret="<<ret<<" when writing"<<endl;
765          }
766        }
767        if(!haveErased)
768          ++i;
769      }
770
771
772      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
773        if(FD_ISSET(i->fd, &readfds)) {
774          if(i->state==TCPConnection::BYTE0) {
775            int bytes=read(i->fd,i->data,2);
776            if(bytes==1)
777              i->state=TCPConnection::BYTE1;
778            if(bytes==2) { 
779              i->qlen=(i->data[0]<<8)+i->data[1];
780              i->bytesread=0;
781              i->state=TCPConnection::GETQUESTION;
782            }
783            if(!bytes || bytes < 0) {
784              close(i->fd);
785              tcpconnections.erase(i);
786              break;
787            }
788          }
789          else if(i->state==TCPConnection::BYTE1) {
790            int bytes=read(i->fd,i->data+1,1);
791            if(bytes==1) {
792              i->state=TCPConnection::GETQUESTION;
793              i->qlen=(i->data[0]<<8)+i->data[1];
794              i->bytesread=0;
795            }
796            if(!bytes || bytes < 0) {
797              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
798              close(i->fd);
799              tcpconnections.erase(i);
800              break;
801            }
802           
803          }
804          else if(i->state==TCPConnection::GETQUESTION) {
805            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
806            if(!bytes || bytes < 0) {
807              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
808              close(i->fd);
809              tcpconnections.erase(i);
810              break;
811            }
812            i->bytesread+=bytes;
813            if(i->bytesread==i->qlen) {
814              i->state=TCPConnection::BYTE0;
815
816              if(P.parse(i->data,i->qlen)<0) {
817                L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
818                close(i->fd);
819                tcpconnections.erase(i);
820                break;
821              }
822              else { 
823                P.setSocket(i->fd);
824                P.d_tcp=true;
825                P.setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
826                if(P.d.qr)
827                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
828                else {
829                  ++qcounter;
830                  MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
831                }
832              }
833            }
834          }
835        }
836      }
837    }
838  }
839  catch(AhuException &ae) {
840    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
841    ret=EXIT_FAILURE;
842  }
843  catch(exception &e) {
844    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
845    ret=EXIT_FAILURE;
846  }
847  catch(...) {
848    L<<Logger::Error<<"any other exception in main: "<<endl;
849    ret=EXIT_FAILURE;
850  }
851 
852#ifdef WIN32
853  WSACleanup();
854#endif // WIN32
855
856  return ret;
857}
Note: See TracBrowser for help on using the browser.