root/trunk/pdns/pdns/pdns_recursor.cc @ 442

Revision 442, 25.2 KB (checked in by ahu, 8 years ago)

add titles to mthreads
fix confusion in mtasker when waiting for duplicated keys
clarified error handling in waiting for events
improve error checking in mtasker
fix dnsreplay timing fix

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2005  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17*/
18
19#include "utility.hh"
20#include <iostream>
21#include <errno.h>
22#include <map>
23#include <set>
24#ifndef WIN32
25#include <netdb.h>
26#endif // WIN32
27#include <stdio.h>
28#include <signal.h>
29#include <stdlib.h>
30#include <unistd.h>
31#include "mtasker.hh"
32#include <utility>
33#include "dnspacket.hh"
34#include "statbag.hh"
35#include "arguments.hh"
36#include "syncres.hh"
37#include <fcntl.h>
38#include <fstream>
39#include "sstuff.hh"
40#include <boost/tuple/tuple.hpp>
41#include <boost/tuple/tuple_comparison.hpp>
42#include <boost/shared_array.hpp>
43
44using namespace boost;
45
46#include "recursor_cache.hh"
47
48#ifdef __FreeBSD__           // see cvstrac ticket #26
49#include <pthread.h>
50#include <semaphore.h>
51#endif
52
53MemRecursorCache RC;
54
55string s_programname="pdns_recursor";
56
57#ifndef WIN32
58#ifndef __FreeBSD__
59extern "C" {
60  int sem_init(sem_t*, int, unsigned int){return 0;}
61  int sem_wait(sem_t*){return 0;}
62  int sem_trywait(sem_t*){return 0;}
63  int sem_post(sem_t*){return 0;}
64  int sem_getvalue(sem_t*, int*){return 0;}
65  pthread_t pthread_self(void){return (pthread_t) 0;}
66  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
67  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
68  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
69
70}
71#endif // __FreeBSD__
72#endif // WIN32
73
74StatBag S;
75ArgvMap &arg()
76{
77  static ArgvMap theArg;
78  return theArg;
79}
80static int d_clientsock;
81static vector<int> d_udpserversocks;
82
83typedef vector<int> tcpserversocks_t;
84static tcpserversocks_t s_tcpserversocks;
85
86struct PacketID
87{
88  PacketID() : sock(0), inNeeded(0), outPos(0)
89  {}
90
91  u_int16_t id;  // wait for a specific id/remote paie
92  struct sockaddr_in remote;  // this is the remote
93
94  Socket* sock;  // or wait for an event on a TCP fd
95  int inNeeded; // if this is set, we'll read until inNeeded bytes are read
96  string inMSG; // they'll go here
97
98  string outMSG; // the outgoing message that needs to be sent
99  int outPos;    // how far we are along in the outMSG
100
101  bool operator<(const PacketID& b) const
102  {
103    int ourSock= sock ? sock->getHandle() : 0;
104    int bSock = b.sock ? b.sock->getHandle() : 0;
105    return 
106      tie(id, remote.sin_addr.s_addr, remote.sin_port, ourSock) <
107      tie(b.id, b.remote.sin_addr.s_addr, b.remote.sin_port, bSock);
108  }
109};
110
111static map<int,PacketID> d_tcpclientreadsocks, d_tcpclientwritesocks;
112
113MTasker<PacketID,string>* MT;
114
115int asendtcp(const string& data, Socket* sock) 
116{
117  PacketID pident;
118  pident.sock=sock;
119  pident.outMSG=data;
120  string packet;
121
122  //  cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
123  d_tcpclientwritesocks[sock->getHandle()]=pident;
124
125  int ret=MT->waitEvent(pident,&packet,1);
126  if(!ret || ret==-1) { // timeout
127    d_tcpclientwritesocks.erase(sock->getHandle());
128  }
129  return ret;
130}
131
132// -1 is error, 0 is timeout, 1 is success
133int arecvtcp(string& data, int len, Socket* sock) 
134{
135  data="";
136  PacketID pident;
137  pident.sock=sock;
138  pident.inNeeded=len;
139
140  // cerr<<"arecvtcp called for "<<len<<" bytes"<<endl;
141  // cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
142  d_tcpclientreadsocks[sock->getHandle()]=pident;
143
144  int ret=MT->waitEvent(pident,&data,1);
145  if(!ret || ret==-1) { // timeout
146    d_tcpclientreadsocks.erase(sock->getHandle());
147  }
148  return ret;
149}
150
151
152/* these two functions are used by LWRes */
153// -1 is error, > 1 is success
154int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
155{
156  return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
157}
158
159// -1 is error, 0 is timeout, 1 is success
160int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
161{
162  PacketID pident;
163  pident.id=id;
164  memcpy(&pident.remote,toaddr,sizeof(pident.remote));
165
166  string packet;
167  int ret=MT->waitEvent(pident,&packet,1);
168  if(ret > 0) {
169    *d_len=packet.size();
170    memcpy(data,packet.c_str(),min(len,*d_len));
171  }
172
173  return ret;
174}
175
176
177static void writePid(void)
178{
179  string fname=arg()["socket-dir"]+"/"+s_programname+".pid";
180  ofstream of(fname.c_str());
181  if(of)
182    of<<getpid()<<endl;
183  else
184    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
185}
186
187void primeHints(void)
188{
189  // prime root cache
190
191  static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
192                     "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
193  DNSResourceRecord arr, nsrr;
194  arr.qtype=QType::A;
195  arr.ttl=time(0)+3600000;
196  nsrr.qtype=QType::NS;
197  nsrr.ttl=time(0)+3600000;
198 
199  set<DNSResourceRecord>nsset;
200  for(char c='a';c<='m';++c) {
201    static char templ[40];
202    strncpy(templ,"a.root-servers.net", sizeof(templ) - 1);
203    *templ=c;
204    arr.qname=nsrr.content=templ;
205    arr.content=ips[c-'a'];
206    set<DNSResourceRecord>aset;
207    aset.insert(arr);
208    RC.replace(string(templ),QType(QType::A),aset);
209
210    nsset.insert(nsrr);
211  }
212  RC.replace("",QType(QType::NS),nsset); // and stuff in the cache
213}
214
215void startDoResolve(void *p)
216{
217  try {
218    bool quiet=arg().mustDo("quiet");
219    DNSPacket P=*(DNSPacket *)p;
220
221    delete (DNSPacket *)p;
222   
223    vector<DNSResourceRecord>ret;
224    DNSPacket *R=P.replyPacket();
225    R->setA(false);
226    R->setRA(true);
227    //    MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName());
228    SyncRes sr;
229    if(!quiet)
230      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (R->d_tcp ? "TCP " : "") << "question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
231
232    sr.setId(MT->getTid());
233    if(!P.d.rd)
234      sr.setCacheOnly();
235
236    int res=sr.beginResolve(P.qdomain, P.qtype, ret);
237    if(res<0)
238      R->setRcode(RCode::ServFail);
239    else {
240      R->setRcode(res);
241      for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i)
242        R->addRecord(*i);
243    }
244
245    const char *buffer=R->getData();
246
247    if(!R->d_tcp) {
248      if(R->len > 512) {
249        R->truncate(512);
250      }
251
252      sendto(R->getSocket(),buffer,R->len,0,(struct sockaddr *)(R->remote),R->d_socklen);
253    }
254    else {
255      char buf[2];
256      buf[0]=R->len/256;
257      buf[1]=R->len%256;
258
259      struct iovec iov[2];
260
261      iov[0].iov_base=(void*)buf;    iov[0].iov_len=2;
262      iov[1].iov_base=(void*)buffer; iov[1].iov_len = R->len;
263
264      int ret=writev(R->getSocket(), iov, 2);
265
266      if(ret <= 0 ) 
267        L<<Logger::Error<<"Error writing TCP answer to "<<P.getRemote()<<": "<< (ret ? strerror(errno) : "EOF") <<endl;
268      else if(ret != 2 + R->len)
269        L<<Logger::Error<<"Oops, partial answer sent to "<<P.getRemote()<<" - probably would have trouble receiving our answer anyhow (size="<<R->len<<")"<<endl;
270
271      //      if(write(R->getSocket(),buf,2)!=2 || write(R->getSocket(),buffer,R->len)!=R->len)
272      //  XXX FIXME write this writev fallback otherwise
273    }
274
275    //    MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName());
276    if(!quiet) {
277      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
278      L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
279        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
280    }
281   
282    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
283
284    delete R;
285  }
286  catch(AhuException &ae) {
287    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
288  }
289  catch(...) {
290    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
291  }
292}
293
294void makeClientSocket()
295{
296  d_clientsock=socket(AF_INET, SOCK_DGRAM,0);
297  if(d_clientsock<0) 
298    throw AhuException("Making a socket for resolver: "+stringerror());
299 
300  struct sockaddr_in sin;
301  memset((char *)&sin,0, sizeof(sin));
302 
303  sin.sin_family = AF_INET;
304
305  if(!IpToU32(arg()["query-local-address"], &sin.sin_addr.s_addr))
306    throw AhuException("Unable to resolve local address '"+ arg()["query-local-address"] +"'"); 
307
308  int tries=10;
309  while(--tries) {
310    u_int16_t port=10000+Utility::random()%10000;
311    sin.sin_port = htons(port); 
312   
313    if (bind(d_clientsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
314      break;
315   
316  }
317  if(!tries)
318    throw AhuException("Resolver binding to local socket: "+stringerror());
319
320  Utility::setNonBlocking(d_clientsock);
321  L<<Logger::Error<<"Sending UDP queries from "<<inet_ntoa(sin.sin_addr)<<":"<< ntohs(sin.sin_port)  <<endl;
322}
323
324void makeTCPServerSockets()
325{
326  vector<string>locals;
327  stringtok(locals,arg()["local-address"]," ,");
328
329  if(locals.empty())
330    throw AhuException("No local address specified");
331 
332  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
333    int fd=socket(AF_INET, SOCK_STREAM,0);
334    if(fd<0) 
335      throw AhuException("Making a server socket for resolver: "+stringerror());
336 
337    struct sockaddr_in sin;
338    memset((char *)&sin,0, sizeof(sin));
339   
340    sin.sin_family = AF_INET;
341    if(!IpToU32(*i, &sin.sin_addr.s_addr))
342      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
343
344    int tmp=1;
345    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
346      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
347      exit(1); 
348    }
349   
350    sin.sin_port = htons(arg().asNum("local-port")); 
351   
352    if (bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
353      throw AhuException("Binding TCP server socket for "+*i+": "+stringerror());
354   
355    Utility::setNonBlocking(fd);
356    listen(fd, 128);
357    s_tcpserversocks.push_back(fd);
358    L<<Logger::Error<<"Listening for TCP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<arg().asNum("local-port")<<endl;
359  }
360}
361
362void makeUDPServerSockets()
363{
364  vector<string>locals;
365  stringtok(locals,arg()["local-address"]," ,");
366
367  if(locals.empty())
368    throw AhuException("No local address specified");
369 
370  if(arg()["local-address"]=="0.0.0.0") {
371    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
372  }
373
374  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
375    int fd=socket(AF_INET, SOCK_DGRAM,0);
376    if(fd<0) 
377      throw AhuException("Making a server socket for resolver: "+stringerror());
378 
379    struct sockaddr_in sin;
380    memset((char *)&sin,0, sizeof(sin));
381   
382    sin.sin_family = AF_INET;
383    if(!IpToU32(*i, &sin.sin_addr.s_addr))
384      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
385   
386    sin.sin_port = htons(arg().asNum("local-port")); 
387   
388    if (bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
389      throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror());
390   
391    Utility::setNonBlocking(fd);
392    d_udpserversocks.push_back(fd);
393    L<<Logger::Error<<"Listening for UDP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<arg().asNum("local-port")<<endl;
394  }
395}
396
397
398#ifndef WIN32
399void daemonize(void)
400{
401  if(fork())
402    exit(0); // bye bye
403 
404  setsid(); 
405
406  // cleanup open fds, but skip sockets
407  close(0);
408  close(1);
409  close(2);
410
411}
412#endif
413
414int counter, qcounter;
415bool statsWanted;
416
417void usr1Handler(int)
418{
419  statsWanted=true;
420}
421
422void usr2Handler(int)
423{
424  SyncRes::setLog(true);
425}
426
427
428
429void doStats(void)
430{
431  if(qcounter) {
432    L<<Logger::Error<<"stats: "<<qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
433     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits";
434    L<<Logger::Error<<", outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
435    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
436     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
437    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
438  }
439  else if(statsWanted) 
440    L<<Logger::Error<<"stats: no stats yet!"<<endl;
441
442  statsWanted=false;
443}
444
445void houseKeeping(void *)
446{
447  static time_t last_stat, last_rootupdate, last_prune;
448  time_t now=time(0);
449  if(now - last_prune > 60) { 
450    RC.doPrune();
451    last_prune=time(0);
452  }
453  if(now - last_stat>1800) { 
454    doStats();
455    last_stat=time(0);
456  }
457  if(now -last_rootupdate>7200) {
458    SyncRes sr;
459    vector<DNSResourceRecord>ret;
460
461    sr.setNoCache();
462    int res=sr.beginResolve("", QType(QType::NS), ret);
463    if(!res) {
464      L<<Logger::Error<<"Refreshed . records"<<endl;
465      last_rootupdate=now;
466    }
467    else
468      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
469  }
470}
471
472struct TCPConnection
473{
474  int fd;
475  enum {BYTE0, BYTE1, GETQUESTION} state;
476  int qlen;
477  int bytesread;
478  struct sockaddr_in remote;
479  char data[65535];
480  time_t startTime;
481};
482
483int main(int argc, char **argv) 
484{
485  int ret = EXIT_SUCCESS;
486#ifdef WIN32
487    WSADATA wsaData;
488    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
489#endif // WIN32
490
491  try {
492    Utility::srandom(time(0));
493    arg().set("soa-minimum-ttl","Don't change")="0";
494    arg().set("soa-serial-offset","Don't change")="0";
495    arg().set("no-shuffle","Don't change")="off";
496    arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
497    arg().set("local-port","port to listen on")="53";
498    arg().set("local-address","IP addresses to listen on, separated by spaces or commas")="0.0.0.0";
499    arg().set("trace","if we should output heaps of logging")="off";
500    arg().set("daemon","Operate as a daemon")="yes";
501    arg().set("chroot","switch to chroot jail")="";
502    arg().set("setgid","If set, change group id to this gid for more security")="";
503    arg().set("setuid","If set, change user id to this uid for more security")="";
504    arg().set("quiet","Suppress logging of questions and answers")="off";
505    arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
506    arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
507    arg().set("delegation-only","Which domains we only accept delegations from")="";
508    arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
509    arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
510    arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
511
512    arg().setCmd("help","Provide a helpful message");
513    L.toConsole(Logger::Warning);
514    arg().laxParse(argc,argv); // do a lax parse
515
516    string configname=arg()["config-dir"]+"/recursor.conf";
517    cleanSlashes(configname);
518
519    if(!arg().file(configname.c_str())) 
520      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
521
522    arg().parse(argc,argv);
523
524    arg().set("delegation-only")=toLower(arg()["delegation-only"]);
525
526    if(arg().mustDo("help")) {
527      cerr<<"syntax:"<<endl<<endl;
528      cerr<<arg().helpstring(arg()["help"])<<endl;
529      exit(99);
530    }
531
532    L.setName("pdns_recursor");
533
534    L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2005 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
535#ifdef __GNUC__
536    L<<", gcc "__VERSION__;
537#endif // add other compilers here
538    L<<") starting up"<<endl;
539
540  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
541    "This is free software, and you are welcome to redistribute it "
542    "according to the terms of the GPL version 2."<<endl;
543
544
545    if(arg().mustDo("trace"))
546      SyncRes::setLog(true);
547   
548    makeClientSocket();
549    makeUDPServerSockets();
550    makeTCPServerSockets();
551       
552    MT=new MTasker<PacketID,string>(100000);
553
554    char data[1500];
555    struct sockaddr_in fromaddr;
556   
557    PacketID pident;
558    primeHints();   
559    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
560#ifndef WIN32
561    if(arg().mustDo("daemon")) {
562      L.toConsole(Logger::Critical);
563      daemonize();
564    }
565    signal(SIGUSR1,usr1Handler);
566    signal(SIGUSR2,usr2Handler);
567
568    writePid();
569#endif
570
571    int newgid=0;
572    if(!arg()["setgid"].empty())
573      newgid=Utility::makeGidNumeric(arg()["setgid"]);
574    int newuid=0;
575    if(!arg()["setuid"].empty())
576      newuid=Utility::makeUidNumeric(arg()["setuid"]);
577
578
579    if (!arg()["chroot"].empty()) {
580        if (chroot(arg()["chroot"].c_str())<0) {
581            L<<Logger::Error<<"Unable to chroot to '"+arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
582            exit(1);
583        }
584    }
585
586    Utility::dropPrivs(newuid, newgid);
587
588    vector<TCPConnection> tcpconnections;
589    counter=0;
590    time_t now;
591    unsigned int maxTcpClients=arg().asNum("max-tcp-clients");
592    for(;;) {
593      while(MT->schedule()); // housekeeping, let threads do their thing
594     
595      if(!((counter++)%100)) 
596        MT->makeThread(houseKeeping,0,"housekeeping");
597      if(statsWanted)
598        doStats();
599
600      Utility::socklen_t addrlen=sizeof(fromaddr);
601      int d_len;
602      DNSPacket P;
603     
604      struct timeval tv;
605      tv.tv_sec=0;
606      tv.tv_usec=500000;
607     
608      fd_set readfds, writefds;
609      FD_ZERO( &readfds );
610      FD_ZERO( &writefds );
611      FD_SET( d_clientsock, &readfds );
612      int fdmax=d_clientsock;
613
614      if(!tcpconnections.empty())
615        now=time(0);
616
617      vector<TCPConnection> sweeped;
618      int tcpLimit=arg().asNum("client-tcp-timeout");
619      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
620        if(now < i->startTime + tcpLimit) {
621          FD_SET(i->fd, &readfds);
622          fdmax=max(fdmax,i->fd);
623          sweeped.push_back(*i);
624        }
625        else {
626          L<<Logger::Error<<"TCP timeout from client "<<inet_ntoa(i->remote.sin_addr)<<endl;
627          close(i->fd);
628        }
629      }
630      sweeped.swap(tcpconnections);
631
632      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
633        FD_SET( *i, &readfds );
634        fdmax=max(fdmax,*i);
635      }
636      if(tcpconnections.size() < maxTcpClients) 
637        for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) {
638          FD_SET(*i, &readfds );
639          fdmax=max(fdmax,*i);
640        }
641
642      for(map<int,PacketID>::const_iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end(); ++i) {
643        // cerr<<"Adding TCP socket "<<i->first<<" to read select set"<<endl;
644        FD_SET( i->first, &readfds );
645        fdmax=max(fdmax,i->first);
646      }
647
648      for(map<int,PacketID>::const_iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ++i) {
649        // cerr<<"Adding TCP socket "<<i->first<<" to write select set"<<endl;
650        FD_SET( i->first, &writefds );
651        fdmax=max(fdmax,i->first);
652      }
653
654      int selret = select(  fdmax + 1, &readfds, &writefds, NULL, &tv );
655      if(selret<=0) 
656        if (selret == -1 && errno!=EINTR) 
657          throw AhuException("Select returned: "+stringerror());
658        else
659          continue;
660
661      if(FD_ISSET(d_clientsock,&readfds)) { // do we have a question response?
662        d_len=recvfrom(d_clientsock, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
663        if(d_len<0) 
664          continue;
665       
666        P.setRemote((struct sockaddr *)&fromaddr, addrlen);
667        if(P.parse(data,d_len)<0) {
668          L<<Logger::Error<<"Unparseable packet from remote server "<<P.getRemote()<<endl;
669        }
670        else { 
671          if(P.d.qr) {
672            pident.remote=fromaddr;
673            pident.id=P.d.id;
674            string packet;
675            packet.assign(data,d_len);
676            MT->sendEvent(pident,&packet);
677          }
678          else 
679            L<<Logger::Warning<<"Ignoring question on outgoing socket from "<<P.getRemote()<<endl;
680        }
681      }
682     
683      for(vector<int>::const_iterator i=d_udpserversocks.begin(); i!=d_udpserversocks.end(); ++i) {
684        if(FD_ISSET(*i,&readfds)) { // do we have a new question on udp?
685          d_len=recvfrom(*i, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);   
686          if(d_len<0) 
687            continue;
688          P.setRemote((struct sockaddr *)&fromaddr, addrlen);
689          if(P.parse(data,d_len)<0) {
690            L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
691          }
692          else { 
693            if(P.d.qr)
694              L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
695            else {
696              ++qcounter;
697              P.setSocket(*i);
698              P.d_tcp=false;
699              MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp");
700            }
701          }
702        }
703      }
704
705      for(tcpserversocks_t::const_iterator i=s_tcpserversocks.begin(); i!=s_tcpserversocks.end(); ++i) { 
706        if(FD_ISSET(*i ,&readfds)) { // do we have a new TCP connection
707          struct sockaddr_in addr;
708          socklen_t addrlen=sizeof(addr);
709          int newsock=accept(*i, (struct sockaddr*)&addr, &addrlen);
710         
711          if(newsock>0) {
712            Utility::setNonBlocking(newsock);
713            TCPConnection tc;
714            tc.fd=newsock;
715            tc.state=TCPConnection::BYTE0;
716            tc.remote=addr;
717            tc.startTime=time(0);
718            tcpconnections.push_back(tc);
719          }
720        }
721      }
722
723      for(map<int,PacketID>::iterator i=d_tcpclientreadsocks.begin(); i!=d_tcpclientreadsocks.end();) { 
724        bool haveErased=false;
725        if(FD_ISSET(i->first, &readfds)) { // can we receive
726          shared_array<char> buffer(new char[i->second.inNeeded]);
727
728          int ret=read(i->first, buffer.get(), min(i->second.inNeeded,200));
729          // cerr<<"Read returned "<<ret<<endl;
730          if(ret > 0) {
731            i->second.inMSG.append(&buffer[0], &buffer[ret]);
732            i->second.inNeeded-=ret;
733            if(!i->second.inNeeded) {
734              // cerr<<"Got entire load of "<<i->second.inMSG.size()<<" bytes"<<endl;
735              PacketID pid=i->second;
736              string msg=i->second.inMSG;
737             
738              d_tcpclientreadsocks.erase((i++));
739              haveErased=true;
740              MT->sendEvent(pid, &msg);   // XXX DODGY
741            }
742            else {
743              // cerr<<"Still have "<<i->second.inNeeded<<" left to go"<<endl;
744            }
745          }
746          else {
747            //      cerr<<"when reading ret="<<ret<<endl;
748            // XXX FIXME I think some stuff needs to happen here - like send an EOF event
749          }
750        }
751        if(!haveErased)
752          ++i;
753      }
754
755      for(map<int,PacketID>::iterator i=d_tcpclientwritesocks.begin(); i!=d_tcpclientwritesocks.end(); ) { 
756        bool haveErased=false;
757        if(FD_ISSET(i->first, &writefds)) { // can we send over TCP
758          // cerr<<"Socket "<<i->first<<" available for writing"<<endl;
759          int ret=write(i->first, i->second.outMSG.c_str(), i->second.outMSG.size() - i->second.outPos);
760          if(ret > 0) {
761            i->second.outPos+=ret;
762            if(i->second.outPos==i->second.outMSG.size()) {
763              // cerr<<"Sent out entire load of "<<i->second.outMSG.size()<<" bytes"<<endl;
764              PacketID pid=i->second;
765              d_tcpclientwritesocks.erase((i++));
766              MT->sendEvent(pid, 0);
767              haveErased=true;
768              // cerr<<"Sent event too"<<endl;
769            }
770
771          }
772          else { 
773            //      cerr<<"ret="<<ret<<" when writing"<<endl;
774            // XXX FIXME I think some stuff needs to happen here - like send an EOF event
775          }
776        }
777        if(!haveErased)
778          ++i;
779      }
780
781
782      for(vector<TCPConnection>::iterator i=tcpconnections.begin();i!=tcpconnections.end();++i) {
783        if(FD_ISSET(i->fd, &readfds)) {
784          if(i->state==TCPConnection::BYTE0) {
785            int bytes=read(i->fd,i->data,2);
786            if(bytes==1)
787              i->state=TCPConnection::BYTE1;
788            if(bytes==2) { 
789              i->qlen=(i->data[0]<<8)+i->data[1];
790              i->bytesread=0;
791              i->state=TCPConnection::GETQUESTION;
792            }
793            if(!bytes || bytes < 0) {
794              close(i->fd);
795              tcpconnections.erase(i);
796              break;
797            }
798          }
799          else if(i->state==TCPConnection::BYTE1) {
800            int bytes=read(i->fd,i->data+1,1);
801            if(bytes==1) {
802              i->state=TCPConnection::GETQUESTION;
803              i->qlen=(i->data[0]<<8)+i->data[1];
804              i->bytesread=0;
805            }
806            if(!bytes || bytes < 0) {
807              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected after first byte"<<endl;
808              close(i->fd);
809              tcpconnections.erase(i);
810              break;
811            }
812           
813          }
814          else if(i->state==TCPConnection::GETQUESTION) {
815            int bytes=read(i->fd,i->data + i->bytesread,i->qlen - i->bytesread);
816            if(!bytes || bytes < 0) {
817              L<<Logger::Error<<"TCP Remote "<<sockAddrToString(&i->remote,sizeof(i->remote))<<" disconnected while reading question body"<<endl;
818              close(i->fd);
819              tcpconnections.erase(i);
820              break;
821            }
822            i->bytesread+=bytes;
823            if(i->bytesread==i->qlen) {
824              i->state=TCPConnection::BYTE0;
825
826              if(P.parse(i->data,i->qlen)<0) {
827                L<<Logger::Error<<"Unparseable packet from remote client "<<P.getRemote()<<endl;
828                close(i->fd);
829                tcpconnections.erase(i);
830                break;
831              }
832              else { 
833                P.setSocket(i->fd);
834                P.d_tcp=true;
835                P.setRemote((struct sockaddr *)&i->remote,sizeof(i->remote));
836                if(P.d.qr)
837                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
838                else {
839                  ++qcounter;
840                  MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp");
841                }
842              }
843            }
844          }
845        }
846      }
847    }
848  }
849  catch(AhuException &ae) {
850    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
851    ret=EXIT_FAILURE;
852  }
853  catch(exception &e) {
854    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
855    ret=EXIT_FAILURE;
856  }
857  catch(...) {
858    L<<Logger::Error<<"any other exception in main: "<<endl;
859    ret=EXIT_FAILURE;
860  }
861 
862#ifdef WIN32
863  WSACleanup();
864#endif // WIN32
865
866  return ret;
867}
Note: See TracBrowser for help on using the browser.