root/trunk/pdns/pdns/pdns_recursor.cc @ 722

Revision 722, 39.4 KB (checked in by ahu, 7 years ago)

fix timeouts of running TCP client queries - we shouldn't time them out while we are still working!

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2006  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#include "utility.hh"
20#include <iostream>
21#include <errno.h>
22#include <map>
23#include <set>
24#ifndef WIN32
25#include <netdb.h>
26#endif // WIN32
27#include "recursor_cache.hh"
28#include <stdio.h>
29#include <signal.h>
30#include <stdlib.h>
31#include <unistd.h>
32#include <netinet/tcp.h>
33#include "mtasker.hh"
34#include <utility>
35#include "arguments.hh"
36#include "syncres.hh"
37#include <fcntl.h>
38#include <fstream>
39#include "sstuff.hh"
40#include <boost/tuple/tuple.hpp>
41#include <boost/tuple/tuple_comparison.hpp>
42#include <boost/shared_array.hpp>
43#include <boost/lexical_cast.hpp>
44#include <boost/function.hpp>
45#include "dnsparser.hh"
46#include "dnswriter.hh"
47#include "dnsrecords.hh"
48#include "zoneparser-tng.hh"
49#include "rec_channel.hh"
50#include "logger.hh"
51#include "iputils.hh"
52#include "mplexer.hh"
53
54#ifndef RECURSOR
55#include "statbag.hh"
56StatBag S;
57#endif
58
59FDMultiplexer* g_fdm;
60unsigned int g_maxTCPPerClient;
61bool g_logCommonErrors;
62using namespace boost;
63
64#ifdef __FreeBSD__           // see cvstrac ticket #26
65#include <pthread.h>
66#include <semaphore.h>
67#endif
68
69MemRecursorCache RC;
70RecursorStats g_stats;
71bool g_quiet;
72NetmaskGroup* g_allowFrom;
73string s_programname="pdns_recursor";
74int g_tcpListenSocket;
75int g_tcpTimeout;
76
77struct DNSComboWriter {
78  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), d_tcp(false), d_socket(-1)
79  {}
80  MOADNSParser d_mdp;
81  void setRemote(struct sockaddr* sa, socklen_t len)
82  {
83    memcpy((void *)d_remote, (void *)sa, len);
84    d_socklen=len;
85  }
86
87  void setSocket(int sock)
88  {
89    d_socket=sock;
90  }
91
92  string getRemote() const
93  {
94    return sockAddrToString((struct sockaddr_in *)d_remote, d_socklen);
95  }
96
97  struct timeval d_now;
98  char d_remote[sizeof(sockaddr_in6)];
99  socklen_t d_socklen;
100  bool d_tcp;
101  int d_socket;
102};
103
104
105#ifndef WIN32
106#ifndef __FreeBSD__
107extern "C" {
108  int sem_init(sem_t*, int, unsigned int){return 0;}
109  int sem_wait(sem_t*){return 0;}
110  int sem_trywait(sem_t*){return 0;}
111  int sem_post(sem_t*){return 0;}
112  int sem_getvalue(sem_t*, int*){return 0;}
113  pthread_t pthread_self(void){return (pthread_t) 0;}
114  int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr){ return 0; }
115  int pthread_mutex_lock(pthread_mutex_t *mutex){ return 0; }
116  int pthread_mutex_unlock(pthread_mutex_t *mutex) { return 0; }
117  int pthread_mutex_destroy(pthread_mutex_t *mutex) { return 0; }
118}
119#endif // __FreeBSD__
120#endif // WIN32
121
122ArgvMap &arg()
123{
124  static ArgvMap theArg;
125  return theArg;
126}
127
128struct timeval g_now;
129typedef vector<int> tcpserversocks_t;
130
131typedef MTasker<PacketID,string> MT_t;
132MT_t* MT;
133
134
135void handleTCPClientWritable(int fd, boost::any& var);
136
137// -1 is error, 0 is timeout, 1 is success
138int asendtcp(const string& data, Socket* sock) 
139{
140  PacketID pident;
141  pident.sock=sock;
142  pident.outMSG=data;
143
144  g_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
145  string packet;
146
147  int ret=MT->waitEvent(pident,&packet,1);
148  if(!ret || ret==-1) { // timeout
149    g_fdm->removeWriteFD(sock->getHandle());
150  }
151  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
152    return -1;
153  }
154  return ret;
155}
156
157void handleTCPClientReadable(int fd, boost::any& var);
158
159// -1 is error, 0 is timeout, 1 is success
160int arecvtcp(string& data, int len, Socket* sock) 
161{
162  data.clear();
163  PacketID pident;
164  pident.sock=sock;
165  pident.inNeeded=len;
166  g_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
167
168  int ret=MT->waitEvent(pident,&data,1);
169  if(!ret || ret==-1) { // timeout
170    g_fdm->removeReadFD(sock->getHandle());
171  }
172  else if(data.empty()) {// error, EOF or other
173    return -1;
174  }
175
176  return ret;
177}
178
179// returns -1 for errors which might go away, throws for ones that won't
180int makeClientSocket()
181{
182  int ret=socket(AF_INET, SOCK_DGRAM, 0);
183  if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
184    return ret;
185
186  if(ret<0) 
187    throw AhuException("Making a socket for resolver: "+stringerror());
188
189  static optional<struct sockaddr_in> sin;
190  if(!sin) {
191    struct sockaddr_in tmp;
192    sin=tmp;
193    memset((char *)&*sin,0, sizeof(sin));
194    sin->sin_family = AF_INET;
195   
196    if(!IpToU32(::arg()["query-local-address"], &sin->sin_addr.s_addr))
197      throw AhuException("Unable to resolve local address '"+ ::arg()["query-local-address"] +"'"); 
198  }
199 
200  int tries=10;
201  while(--tries) {
202    uint16_t port=1025+Utility::random()%64510;
203    if(tries==1)  // fall back to kernel 'random'
204        port=0;
205
206    sin->sin_port = htons(port); 
207   
208    if (::bind(ret, (struct sockaddr *)&*sin, sizeof(*sin)) >= 0) 
209      break;
210  }
211  if(!tries)
212    throw AhuException("Resolver binding to local query client socket: "+stringerror());
213
214  Utility::setNonBlocking(ret);
215  return ret;
216}
217
218void handleUDPServerResponse(int fd, boost::any&);
219
220// you can ask this class for a UDP socket to send a query from
221// this socket is not yours, don't even think about deleting it
222// but after you call 'returnSocket' on it, don't assume anything anymore
223class UDPClientSocks
224{
225  unsigned int d_numsocks;
226  unsigned int d_maxsocks;
227
228public:
229  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
230  {
231  }
232
233  typedef map<int,int> socks_t;
234  socks_t d_socks;
235
236  // returning -1 means: temporary OS error (ie, out of files)
237  int getSocket()
238  {
239    pair<int, int> sock=make_pair(makeClientSocket(), 1);
240    if(sock.first < 0) // temporary error - exception otherwise
241      return -1;
242
243    d_socks.insert(sock);
244    d_numsocks++;
245    return sock.first;
246  }
247
248  void returnSocket(int fd)
249  {
250    socks_t::iterator i=d_socks.find(fd);
251    returnSocket(i);
252  }
253
254  // return a socket to the pool, or simply erase it
255  void returnSocket(socks_t::iterator& i)
256  {
257    g_fdm->removeReadFD(i->first);
258    ::close(i->first);
259   
260    d_socks.erase(i++);
261    --d_numsocks;
262  }
263}g_udpclientsocks;
264
265
266/* these two functions are used by LWRes */
267// -2 is OS error, -1 is error that depends on the remote, > 1 is success
268int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id, const string& domain, int* fd) 
269{
270  *fd=g_udpclientsocks.getSocket();
271  if(*fd < 0)
272    return -2;
273  PacketID pident;
274  pident.fd=*fd;
275  pident.id=id;
276  pident.domain=domain;
277  memcpy(&pident.remote, toaddr, sizeof(pident.remote));
278 
279  int ret=connect(*fd, toaddr, addrlen);
280  if(ret < 0)
281    return ret;
282
283  g_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
284  return send(*fd, data, len, 0);
285}
286
287// -1 is error, 0 is timeout, 1 is success
288int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id, const string& domain, int fd)
289{
290  static optional<unsigned int> nearMissLimit;
291  if(!nearMissLimit) 
292    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
293
294  PacketID pident;
295  pident.fd=fd;
296  pident.id=id;
297  pident.domain=domain;
298  memcpy(&pident.remote, toaddr, sizeof(pident.remote));
299
300  string packet;
301  int ret=MT->waitEvent(pident, &packet, 1);
302  if(ret > 0) {
303    if(packet.empty()) {// means "error"
304      return -1; 
305    }
306
307    *d_len=packet.size();
308    memcpy(data,packet.c_str(),min(len,*d_len));
309    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
310      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<sockAddrToString((struct sockaddr_in*)toaddr, sizeof(pident.remote))<<", assuming spoof attempt."<<endl;
311      g_stats.spoofCount++;
312      return -1;
313    }
314  }
315  else {
316    g_udpclientsocks.returnSocket(fd);
317  }
318  return ret;
319}
320
321void setBuffer(int fd, int optname, uint32_t size)
322{
323  uint32_t psize=0;
324  socklen_t len=sizeof(psize);
325 
326  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
327    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
328    return; 
329  }
330
331  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
332    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
333}
334
335
336static void setReceiveBuffer(int fd, uint32_t size)
337{
338  setBuffer(fd, SO_RCVBUF, size);
339}
340
341static void setSendBuffer(int fd, uint32_t size)
342{
343  setBuffer(fd, SO_SNDBUF, size);
344}
345
346static void writePid(void)
347{
348  string fname=::arg()["socket-dir"]+"/"+s_programname+".pid";
349  ofstream of(fname.c_str());
350  if(of)
351    of<< getpid() <<endl;
352  else
353    L<<Logger::Error<<"Requested to write pid for "<<getpid()<<" to "<<fname<<" failed: "<<strerror(errno)<<endl;
354}
355
356void primeHints(void)
357{
358  // prime root cache
359  set<DNSResourceRecord>nsset;
360
361  if(::arg()["hint-file"].empty()) {
362    static char*ips[]={"198.41.0.4", "192.228.79.201", "192.33.4.12", "128.8.10.90", "192.203.230.10", "192.5.5.241", "192.112.36.4", "128.63.2.53", 
363                       "192.36.148.17","192.58.128.30", "193.0.14.129", "198.32.64.12", "202.12.27.33"};
364    DNSResourceRecord arr, nsrr;
365    arr.qtype=QType::A;
366    arr.ttl=time(0)+3600000;
367    nsrr.qtype=QType::NS;
368    nsrr.ttl=time(0)+3600000;
369   
370    for(char c='a';c<='m';++c) {
371      static char templ[40];
372      strncpy(templ,"a.root-servers.net.", sizeof(templ) - 1);
373      *templ=c;
374      arr.qname=nsrr.content=templ;
375      arr.content=ips[c-'a'];
376      set<DNSResourceRecord> aset;
377      aset.insert(arr);
378      RC.replace(string(templ), QType(QType::A), aset);
379     
380      nsset.insert(nsrr);
381    }
382  }
383  else {
384    ZoneParserTNG zpt(::arg()["hint-file"]);
385    DNSResourceRecord rr;
386    set<DNSResourceRecord> aset;
387
388    while(zpt.get(rr)) {
389      rr.ttl+=time(0);
390      if(rr.qtype.getCode()==QType::A) {
391        set<DNSResourceRecord> aset;
392        aset.insert(rr);
393        RC.replace(rr.qname, QType(QType::A), aset);
394      }
395      if(rr.qtype.getCode()==QType::NS) {
396        rr.content=toLower(rr.content);
397        nsset.insert(rr);
398      }
399    }
400  }
401  RC.replace(".", QType(QType::NS), nsset); // and stuff in the cache
402}
403
404map<uint32_t, uint32_t> g_tcpClientCounts;
405
406struct TCPConnection
407{
408  int fd;
409  enum stateenum {BYTE0, BYTE1, GETQUESTION, DONE} state;
410  int qlen;
411  int bytesread;
412  struct sockaddr_in remote;
413  char data[65535];
414  time_t startTime;
415
416  void closeAndCleanup()
417  {
418    close(fd);
419    if(!g_tcpClientCounts[remote.sin_addr.s_addr]--) 
420      g_tcpClientCounts.erase(remote.sin_addr.s_addr);
421    s_currentConnections--;
422  }
423  static unsigned int s_currentConnections; //!< total number of current TCP connections
424};
425
426unsigned int TCPConnection::s_currentConnections; 
427
428void startDoResolve(void *p)
429{
430  DNSComboWriter* dc=(DNSComboWriter *)p;
431  try {
432    uint16_t maxudpsize=512;
433    MOADNSParser::EDNSOpts edo;
434    if(dc->d_mdp.getEDNSOpts(&edo)) {
435      maxudpsize=edo.d_packetsize;
436    }
437   
438    vector<DNSResourceRecord> ret;
439   
440    vector<uint8_t> packet;
441    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
442
443    pw.getHeader()->aa=0;
444    pw.getHeader()->ra=1;
445    pw.getHeader()->qr=1;
446    pw.getHeader()->id=dc->d_mdp.d_header.id;
447    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
448
449    SyncRes sr(dc->d_now);
450    if(!g_quiet)
451      L<<Logger::Error<<"["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
452       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
453
454    sr.setId(MT->getTid());
455    if(!dc->d_mdp.d_header.rd)
456      sr.setCacheOnly();
457
458    int res=sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret);
459    if(res<0) {
460      pw.getHeader()->rcode=RCode::ServFail;
461      // no commit here, because no record
462      g_stats.servFails++;
463    }
464    else {
465      pw.getHeader()->rcode=res;
466      switch(res) {
467      case RCode::ServFail:
468        g_stats.servFails++;
469        break;
470      case RCode::NXDomain:
471        g_stats.nxDomains++;
472        break;
473      case RCode::NoError:
474        g_stats.noErrors++;
475        break;
476      }
477     
478      if(ret.size()) {
479        shuffle(ret);
480        for(vector<DNSResourceRecord>::const_iterator i=ret.begin();i!=ret.end();++i) {
481          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, 1, (DNSPacketWriter::Place)i->d_place);
482         
483          shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), 1, i->content));
484         
485          drc->toPacket(pw);
486       
487          if(!dc->d_tcp && pw.size() > maxudpsize) {
488            pw.rollback();
489            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
490              pw.getHeader()->tc=1;
491            goto sendit; // need to jump over pw.commit
492          }
493        }
494        pw.commit();
495      }
496    }
497  sendit:;
498    if(!dc->d_tcp) {
499      sendto(dc->d_socket, &*packet.begin(), packet.size(), 0, (struct sockaddr *)(dc->d_remote), dc->d_socklen);
500    }
501    else {
502      char buf[2];
503      buf[0]=packet.size()/256;
504      buf[1]=packet.size()%256;
505
506      struct iovec iov[2];
507
508      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
509      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
510
511      int ret=writev(dc->d_socket, iov, 2);
512      bool hadError=true;
513
514      if(ret == 0) 
515        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
516      else if(ret < 0 ) 
517        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
518      else if((unsigned int)ret != 2 + packet.size())
519        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
520      else
521        hadError=false;
522     
523      // update tcp connection status, either by closing or moving to 'BYTE0'
524
525      if(hadError) {
526        g_fdm->removeReadFD(dc->d_socket);
527        close(dc->d_socket);
528      }
529      else {
530        any_cast<TCPConnection&>(g_fdm->getReadParameter(dc->d_socket)).state=TCPConnection::BYTE0;
531        struct timeval now; 
532        gettimeofday(&now, 0); // needs to be updated
533        g_fdm->setReadTTD(dc->d_socket, now, g_tcpTimeout);
534      }
535    }
536
537    if(!g_quiet) {
538      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
539      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
540        sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
541    }
542   
543    sr.d_outqueries ? RC.cacheMisses++ : RC.cacheHits++; 
544    float spent=makeFloat(sr.d_now-dc->d_now);
545    if(spent < 0.001)
546      g_stats.answers0_1++;
547    else if(spent < 0.010)
548      g_stats.answers1_10++;
549    else if(spent < 0.1)
550      g_stats.answers10_100++;
551    else if(spent < 1.0)
552      g_stats.answers100_1000++;
553    else
554      g_stats.answersSlow++;
555
556    uint64_t newLat=(uint64_t)(spent*1000000);
557    if(newLat < 1000000)  // outliers of several minutes exist..
558      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
559    delete dc;
560  }
561  catch(AhuException &ae) {
562    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
563  }
564  catch(MOADNSException& e) {
565    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
566  }
567  catch(exception& e) {
568    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
569  }
570  catch(...) {
571    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
572  }
573}
574
575RecursorControlChannel s_rcc;
576
577void makeControlChannelSocket()
578{
579  string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
580  if(::arg().mustDo("fork")) {
581    sockname+="."+lexical_cast<string>(getpid());
582    L<<Logger::Warning<<"Forked control socket name: "<<sockname<<endl;
583  }
584  s_rcc.listen(sockname);
585}
586
587void handleRunningTCPQuestion(int fd, boost::any& var)
588{
589  TCPConnection& conn=any_cast<TCPConnection&>(var);
590
591  if(conn.state==TCPConnection::BYTE0) {
592    int bytes=read(conn.fd,conn.data,2);
593    if(bytes==1)
594      conn.state=TCPConnection::BYTE1;
595    if(bytes==2) { 
596      conn.qlen=(conn.data[0]<<8)+conn.data[1];
597      conn.bytesread=0;
598      conn.state=TCPConnection::GETQUESTION;
599    }
600    if(!bytes || bytes < 0) {
601      TCPConnection tmp(conn); 
602      g_fdm->removeReadFD(fd);
603      tmp.closeAndCleanup();
604      return;
605    }
606  }
607  else if(conn.state==TCPConnection::BYTE1) {
608    int bytes=read(conn.fd,conn.data+1,1);
609    if(bytes==1) {
610      conn.state=TCPConnection::GETQUESTION;
611      conn.qlen=(conn.data[0]<<8)+conn.data[1];
612      conn.bytesread=0;
613    }
614    if(!bytes || bytes < 0) {
615      if(g_logCommonErrors)
616        L<<Logger::Error<<"TCP client "<<sockAddrToString(&conn.remote,sizeof(conn.remote))<<" disconnected after first byte"<<endl;
617      TCPConnection tmp(conn); 
618      g_fdm->removeReadFD(fd);
619      tmp.closeAndCleanup();  // conn loses validity here..
620      return;
621    }
622  }
623  else if(conn.state==TCPConnection::GETQUESTION) {
624    int bytes=read(conn.fd,conn.data + conn.bytesread,conn.qlen - conn.bytesread);
625    if(!bytes || bytes < 0) {
626      L<<Logger::Error<<"TCP client "<<sockAddrToString(&conn.remote,sizeof(conn.remote))<<" disconnected while reading question body"<<endl;
627      TCPConnection tmp(conn);
628      g_fdm->removeReadFD(fd);
629      tmp.closeAndCleanup();  // conn loses validity here..
630
631      return;
632    }
633    conn.bytesread+=bytes;
634    if(conn.bytesread==conn.qlen) {
635      conn.state=TCPConnection::DONE;        // this makes us immune from timeouts, from now on *we* are responsible
636      DNSComboWriter* dc=0;
637      try {
638        dc=new DNSComboWriter(conn.data, conn.qlen, g_now);
639      }
640      catch(MOADNSException &mde) {
641        g_stats.clientParseError++; 
642        L<<Logger::Error<<"Unable to parse packet from TCP client "<<sockAddrToString(&conn.remote,sizeof(conn.remote))<<endl;
643        TCPConnection tmp(conn); 
644        g_fdm->removeReadFD(fd);
645        tmp.closeAndCleanup();
646        return;
647      }
648     
649      dc->setSocket(conn.fd);
650      dc->d_tcp=true;
651      dc->setRemote((struct sockaddr *)&conn.remote,sizeof(conn.remote));
652      if(dc->d_mdp.d_header.qr)
653        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
654      else {
655        ++g_stats.qcounter;
656        ++g_stats.tcpqcounter;
657        MT->makeThread(startDoResolve, dc);
658        return;
659      }
660    }
661  }
662}
663
664//! Handle new incoming TCP connection
665void handleNewTCPQuestion(int fd, boost::any& )
666{
667  struct sockaddr_in addr;
668  socklen_t addrlen=sizeof(addr);
669  int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
670  if(newsock>0) {
671    if(g_allowFrom && !g_allowFrom->match(&addr)) {
672      g_stats.unauthorizedTCP++;
673      close(newsock);
674      return;
675    }
676   
677    if(g_maxTCPPerClient && g_tcpClientCounts.count(addr.sin_addr.s_addr) && g_tcpClientCounts[addr.sin_addr.s_addr] >= g_maxTCPPerClient) {
678      g_stats.tcpClientOverflow++;
679      close(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
680      return;
681    }
682    g_tcpClientCounts[addr.sin_addr.s_addr]++;
683    Utility::setNonBlocking(newsock);
684    TCPConnection tc;
685    tc.fd=newsock;
686    tc.state=TCPConnection::BYTE0;
687    tc.remote=addr;
688    tc.startTime=g_now.tv_sec;
689    TCPConnection::s_currentConnections++;
690    g_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
691    struct timeval now;
692    gettimeofday(&now, 0);
693    g_fdm->setReadTTD(tc.fd, now, g_tcpTimeout);
694  }
695}
696
697void makeTCPServerSockets()
698{
699  vector<string>locals;
700  stringtok(locals,::arg()["local-address"]," ,");
701
702  if(locals.empty())
703    throw AhuException("No local address specified");
704 
705  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
706    g_tcpListenSocket=socket(AF_INET, SOCK_STREAM,0);
707    if(g_tcpListenSocket<0) 
708      throw AhuException("Making a server socket for resolver: "+stringerror());
709 
710    struct sockaddr_in sin;
711    memset((char *)&sin,0, sizeof(sin));
712   
713    sin.sin_family = AF_INET;
714    if(!IpToU32(*i, &sin.sin_addr.s_addr))
715      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
716
717    int tmp=1;
718    if(setsockopt(g_tcpListenSocket,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
719      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
720      exit(1);
721    }
722   
723#ifdef TCP_DEFER_ACCEPT
724    if(setsockopt(g_tcpListenSocket, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
725      L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
726    }
727#endif
728
729    sin.sin_port = htons(::arg().asNum("local-port")); 
730   
731    if (::bind(g_tcpListenSocket, (struct sockaddr *)&sin, sizeof(sin))<0) 
732      throw AhuException("Binding TCP server socket for "+*i+": "+stringerror());
733   
734    Utility::setNonBlocking(g_tcpListenSocket);
735    setSendBuffer(g_tcpListenSocket, 65000);
736    listen(g_tcpListenSocket, 128);
737    g_fdm->addReadFD(g_tcpListenSocket, handleNewTCPQuestion);
738    L<<Logger::Error<<"Listening for TCP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<::arg().asNum("local-port")<<endl;
739  }
740}
741
742void handleNewUDPQuestion(int fd, boost::any& var)
743{
744  int d_len;
745  char data[1500];
746  struct sockaddr_in fromaddr;
747  socklen_t addrlen=sizeof(fromaddr);
748
749  while((d_len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
750    if(g_allowFrom && !g_allowFrom->match(&fromaddr)) {
751      g_stats.unauthorizedUDP++;
752      continue;
753    }
754   
755    try {
756      DNSComboWriter* dc = new DNSComboWriter(data, d_len, g_now);
757     
758      dc->setRemote((struct sockaddr *)&fromaddr, addrlen);
759     
760      if(dc->d_mdp.d_header.qr) {
761        if(g_logCommonErrors)
762          L<<Logger::Error<<"Ignoring answer from "<<dc->getRemote()<<" on server socket!"<<endl;
763      }
764      else {
765        ++g_stats.qcounter;
766        dc->setSocket(fd);
767        dc->d_tcp=false;
768        MT->makeThread(startDoResolve, (void*) dc);
769      }
770    }
771    catch(MOADNSException& mde) {
772      g_stats.clientParseError++; 
773      L<<Logger::Error<<"Unable to parse packet from remote UDP client "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<": "<<mde.what()<<endl;
774    }
775  }
776}
777
778
779void makeUDPServerSockets()
780{
781  vector<string>locals;
782  stringtok(locals,::arg()["local-address"]," ,");
783
784  if(locals.empty())
785    throw AhuException("No local address specified");
786 
787  if(::arg()["local-address"]=="0.0.0.0") {
788    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
789  }
790
791  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
792    int fd=socket(AF_INET, SOCK_DGRAM,0);
793    if(fd<0) 
794      throw AhuException("Making a server socket for resolver: "+stringerror());
795    setReceiveBuffer(fd, 200000);
796    struct sockaddr_in sin;
797    memset((char *)&sin,0, sizeof(sin));
798   
799    sin.sin_family = AF_INET;
800    if(!IpToU32(*i, &sin.sin_addr.s_addr))
801      throw AhuException("Unable to resolve local address '"+ *i +"'"); 
802   
803    sin.sin_port = htons(::arg().asNum("local-port")); 
804   
805    if (::bind(fd, (struct sockaddr *)&sin, sizeof(sin))<0) 
806      throw AhuException("Resolver binding to server socket for "+*i+": "+stringerror());
807   
808    Utility::setNonBlocking(fd);
809    g_fdm->addReadFD(fd, handleNewUDPQuestion);
810    L<<Logger::Error<<"Listening for UDP queries on "<<inet_ntoa(sin.sin_addr)<<":"<<::arg().asNum("local-port")<<endl;
811  }
812}
813
814
815#ifndef WIN32
816void daemonize(void)
817{
818  if(fork())
819    exit(0); // bye bye
820 
821  setsid(); 
822
823  // cleanup open fds, but skip sockets
824  close(0);
825  close(1);
826  close(2);
827}
828#endif
829
830uint64_t counter;
831bool statsWanted;
832
833
834void usr1Handler(int)
835{
836  statsWanted=true;
837}
838
839
840
841void usr2Handler(int)
842{
843  SyncRes::setLog(true);
844  g_quiet=false;
845  ::arg().set("quiet")="no";
846
847}
848
849void doStats(void)
850{
851  if(g_stats.qcounter) {
852    L<<Logger::Error<<"stats: "<<g_stats.qcounter<<" questions, "<<RC.size()<<" cache entries, "<<SyncRes::s_negcache.size()<<" negative entries, "
853     <<(int)((RC.cacheHits*100.0)/(RC.cacheHits+RC.cacheMisses))<<"% cache hits"<<endl;
854    L<<Logger::Error<<"stats: throttle map: "<<SyncRes::s_throttle.size()<<", ns speeds: "
855     <<SyncRes::s_nsSpeeds.size()<<endl; // ", bytes: "<<RC.bytes()<<endl;
856    L<<Logger::Error<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
857    L<<Logger::Error<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
858     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
859    L<<Logger::Error<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<MT->numProcesses()<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
860  }
861  else if(statsWanted) 
862    L<<Logger::Error<<"stats: no stats yet!"<<endl;
863
864  statsWanted=false;
865}
866
867static void houseKeeping(void *)
868{
869  static time_t last_stat, last_rootupdate, last_prune;
870  struct timeval now;
871  gettimeofday(&now, 0);
872
873  if(now.tv_sec - last_prune > 300) { 
874    DTime dt;
875    dt.setTimeval(now);
876    RC.doPrune();
877   
878    typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
879    negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(SyncRes::s_negcache);
880
881    negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
882    ttdindex.erase(ttdindex.begin(), i);
883
884    time_t limit=now.tv_sec-300;
885    for(SyncRes::nsspeeds_t::iterator i = SyncRes::s_nsSpeeds.begin() ; i!= SyncRes::s_nsSpeeds.end(); )
886      if(i->second.stale(limit))
887        SyncRes::s_nsSpeeds.erase(i++);
888      else
889        ++i;
890
891    //   cerr<<"Pruned "<<pruned<<" records, left "<<SyncRes::s_negcache.size()<<"\n";
892//    cout<<"Prune took "<<dt.udiff()<<"usec\n";
893    last_prune=time(0);
894  }
895  if(now.tv_sec - last_stat>1800) { 
896    doStats();
897    last_stat=time(0);
898  }
899  if(now.tv_sec -last_rootupdate>7200) {
900    SyncRes sr(now);
901    vector<DNSResourceRecord> ret;
902
903    sr.setNoCache();
904    int res=sr.beginResolve(".", QType(QType::NS), ret);
905    if(!res) {
906      L<<Logger::Error<<"Refreshed . records"<<endl;
907      last_rootupdate=now.tv_sec;
908    }
909    else
910      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
911  }
912}
913
914
915
916#if 0
917#include <execinfo.h>
918
919  multimap<uint32_t,string> rev;
920  for(map<string,uint32_t>::const_iterator i=casesptr->begin(); i!=casesptr->end(); ++i) {
921    rev.insert(make_pair(i->second,i->first));
922  }
923  for(multimap<uint32_t,string>::const_iterator i=rev.begin(); i!= rev.end(); ++i)
924    cout<<i->first<<" times: \n"<<i->second<<"\n";
925
926  cout.flush();
927
928map<string,uint32_t>* casesptr;
929static string maketrace()
930{
931  void *array[20]; //only care about last 17 functions (3 taken with tracing support)
932  size_t size;
933  char **strings;
934  size_t i;
935
936  size = backtrace (array, 5);
937  strings = backtrace_symbols (array, size); //Need -rdynamic gcc (linker) flag for this to work
938
939  string ret;
940
941  for (i = 0; i < size; i++) //skip useless functions
942    ret+=string(strings[i])+"\n";
943  return ret;
944}
945
946extern "C" {
947
948int gettimeofday (struct timeval *__restrict __tv,
949                  __timezone_ptr_t __tz)
950{
951  static map<string, uint32_t> s_cases;
952  casesptr=&s_cases;
953  s_cases[maketrace()]++;
954  __tv->tv_sec=time(0);
955  return 0;
956}
957
958}
959#endif
960
961string questionExpand(const char* packet, uint16_t len)
962{
963  const char* end=packet+len;
964  const char* pos=packet+12;
965  unsigned char labellen;
966  string ret;
967  ret.reserve(len-12);
968  while((labellen=*pos++)) {
969    if(pos+labellen > end)
970      break;
971    ret.append(pos, labellen);
972    ret.append(1,'.');
973    pos+=labellen;
974  }
975  if(ret.empty())
976    ret=".";
977  return ret;
978}
979
980
981void handleRCC(int fd, boost::any& var)
982{
983  string remote;
984  string msg=s_rcc.recv(&remote);
985  RecursorControlParser rcp;
986  RecursorControlParser::func_t* command;
987  string answer=rcp.getAnswer(msg, &command);
988  s_rcc.send(answer, &remote);
989  command();
990}
991
992void handleTCPClientReadable(int fd, boost::any& var)
993{
994  PacketID& pident=any_cast<PacketID&>(var);
995  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident.inNeeded: "<<pident.inNeeded<<", "<<pident.sock->getHandle()<<endl;
996
997  shared_array<char> buffer(new char[pident.inNeeded]);
998
999  int ret=read(fd, buffer.get(), pident.inNeeded);
1000  if(ret > 0) {
1001    pident.inMSG.append(&buffer[0], &buffer[ret]);
1002    pident.inNeeded-=ret;
1003    if(!pident.inNeeded) {
1004      //      cerr<<"Got entire load of "<<pident.inMSG.size()<<" bytes"<<endl;
1005      PacketID pid=pident;
1006      string msg=pident.inMSG;
1007     
1008      g_fdm->removeReadFD(fd);
1009      MT->sendEvent(pid, &msg); 
1010    }
1011    else {
1012      //      cerr<<"Still have "<<pident.inNeeded<<" left to go"<<endl;
1013    }
1014  }
1015  else {
1016    PacketID tmp=pident;
1017    g_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1018    string empty;
1019    MT->sendEvent(tmp, &empty); // this conveys error status
1020  }
1021}
1022
1023void handleTCPClientWritable(int fd, boost::any& var)
1024{
1025  PacketID& pid=any_cast<PacketID&>(var);
1026 
1027  int ret=write(fd, pid.outMSG.c_str(), pid.outMSG.size() - pid.outPos);
1028  if(ret > 0) {
1029    pid.outPos+=ret;
1030    if(pid.outPos==pid.outMSG.size()) {
1031      PacketID tmp=pid;
1032      g_fdm->removeWriteFD(fd);
1033      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1034    }
1035  }
1036  else {  // error or EOF
1037    PacketID tmp(pid);
1038    g_fdm->removeWriteFD(fd);
1039    string sent;
1040    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1041  }
1042}
1043
1044void handleUDPServerResponse(int fd, boost::any& var)
1045{
1046  PacketID& pid=any_cast<PacketID&>(var);
1047  int len;
1048  char data[1500];
1049  struct sockaddr_in fromaddr;
1050  socklen_t addrlen=sizeof(fromaddr);
1051
1052  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1053  g_udpclientsocks.returnSocket(fd);
1054
1055  if(len < (int)sizeof(dnsheader)) {
1056    if(len < 0)
1057      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1058    else {
1059      g_stats.serverParseError++; 
1060      if(g_logCommonErrors)
1061        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen) <<
1062          ": packet smalller than DNS header"<<endl;
1063    }
1064    string empty;
1065    MT->sendEvent(pid, &empty); // this denotes error
1066    return;
1067  } 
1068
1069  dnsheader dh;
1070  memcpy(&dh, data, sizeof(dh));
1071 
1072  if(!dh.qdcount) // UPC, Nominum?
1073    return;
1074 
1075  if(dh.qr) {
1076    PacketID pident;
1077    pident.remote=fromaddr;
1078    pident.id=dh.id;
1079    pident.fd=fd;
1080    pident.domain=questionExpand(data, len); // don't copy this from above - we need to do the actual read
1081    string packet;
1082    packet.assign(data, len);
1083    if(!MT->sendEvent(pident, &packet)) {
1084      // if(g_logCommonErrors)
1085      //   L<<Logger::Warning<<"Discarding unexpected packet from "<<sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen)<<endl;
1086      g_stats.unexpectedCount++;
1087     
1088      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1089        if(pident.fd==mthread->key.fd && !memcmp(&mthread->key.remote.sin_addr, &pident.remote.sin_addr, sizeof(pident.remote.sin_addr)) && 
1090           !strcasecmp(pident.domain.c_str(), mthread->key.domain.c_str())) {
1091          mthread->key.nearMisses++;
1092        }
1093      }
1094    }
1095  }
1096  else
1097    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr, addrlen)  <<endl;
1098}
1099
1100#if 0
1101// this code sweeps all running tcp connections for timeouts, it didn't survive move to multiplexer
1102      vector<TCPConnection> sweeped;
1103
1104      for(vector<TCPConnection>::iterator i=g_tcpconnections.begin();i!=g_tcpconnections.end();++i) {
1105        if(i->state==TCPConnection::DONE || g_now.tv_sec < i->startTime + tcpTimeout) {  // don't timeout when we are working on the question!
1106          sweeped.push_back(*i);
1107          if(i->state!=TCPConnection::DONE) { // we don't listen for data when we are processing the question
1108            FD_SET(i->fd, &readfds);
1109            fdmax=max(fdmax,i->fd);
1110          }
1111        }
1112        else {
1113          if(g_logCommonErrors)
1114            L<<Logger::Error<<"TCP timeout from client "<<inet_ntoa(i->remote.sin_addr)<<endl;
1115          i->closeAndCleanup();
1116        }
1117      }
1118      sweeped.swap(g_tcpconnections);
1119#endif
1120
1121FDMultiplexer* getMultiplexer()
1122{
1123  FDMultiplexer* ret;
1124  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1125      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1126    try {
1127      ret=i->second();
1128      L<<Logger::Error<<"Enabled '"<<ret->getName()<<"' multiplexer"<<endl;
1129      return ret;
1130    }
1131    catch(...)
1132      {}
1133  }
1134  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1135  exit(1);
1136}
1137
1138int main(int argc, char **argv) 
1139{
1140  reportBasicTypes();
1141
1142  int ret = EXIT_SUCCESS;
1143#ifdef WIN32
1144    WSADATA wsaData;
1145    WSAStartup( MAKEWORD( 2, 0 ), &wsaData );
1146#endif // WIN32
1147
1148  try {
1149    Utility::srandom(time(0));
1150    ::arg().set("soa-minimum-ttl","Don't change")="0";
1151    ::arg().set("soa-serial-offset","Don't change")="0";
1152    ::arg().set("no-shuffle","Don't change")="off";
1153    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1154    ::arg().set("local-port","port to listen on")="53";
1155    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas")="127.0.0.1";
1156    ::arg().set("trace","if we should output heaps of logging")="off";
1157    ::arg().set("daemon","Operate as a daemon")="yes";
1158    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1159    ::arg().set("chroot","switch to chroot jail")="";
1160    ::arg().set("setgid","If set, change group id to this gid for more security")="";
1161    ::arg().set("setuid","If set, change user id to this uid for more security")="";
1162    ::arg().set("quiet","Suppress logging of questions and answers")="true";
1163    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1164    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1165    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1166    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1167    //    ::arg().set("query-local-port","Source port address for sending queries, defaults to random")="";
1168    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1169    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1170    ::arg().set("hint-file", "If set, load root hints from this file")="";
1171    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="0";
1172    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")="127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12";
1173    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
1174    ::arg().set("fork", "If set, fork the daemon for possible double performance")="no";
1175    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
1176    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
1177
1178    ::arg().setCmd("help","Provide a helpful message");
1179    L.toConsole(Logger::Warning);
1180    ::arg().laxParse(argc,argv); // do a lax parse
1181
1182    string configname=::arg()["config-dir"]+"/recursor.conf";
1183    cleanSlashes(configname);
1184
1185    if(!::arg().file(configname.c_str())) 
1186      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
1187
1188    ::arg().parse(argc,argv);
1189
1190    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
1191
1192    if(::arg().mustDo("help")) {
1193      cerr<<"syntax:"<<endl<<endl;
1194      cerr<<::arg().helpstring(::arg()["help"])<<endl;
1195      exit(99);
1196    }
1197
1198    L.setName("pdns_recursor");
1199
1200
1201    L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2006 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1202#ifdef __GNUC__
1203    L<<", gcc "__VERSION__;
1204#endif // add other compilers here
1205    L<<") starting up"<<endl;
1206
1207    L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1208    L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1209      "This is free software, and you are welcome to redistribute it "
1210      "according to the terms of the GPL version 2."<<endl;
1211   
1212    g_fdm=getMultiplexer();
1213
1214    if(!::arg()["allow-from"].empty()) {
1215      g_allowFrom=new NetmaskGroup;
1216      vector<string> ips;
1217      stringtok(ips, ::arg()["allow-from"], ", ");
1218      L<<Logger::Warning<<"Only allowing queries from: ";
1219      for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1220        g_allowFrom->addMask(*i);
1221        if(i!=ips.begin())
1222          L<<Logger::Warning<<", ";
1223        L<<Logger::Warning<<*i;
1224      }
1225      L<<Logger::Warning<<endl;
1226    }
1227    else if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53)
1228      L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1229   
1230    g_quiet=::arg().mustDo("quiet");
1231    if(::arg().mustDo("trace")) {
1232      SyncRes::setLog(true);
1233      ::arg().set("quiet")="no";
1234      g_quiet=false;
1235    }
1236
1237    g_logCommonErrors=::arg().mustDo("log-common-errors");
1238
1239    makeUDPServerSockets();
1240    makeTCPServerSockets();
1241
1242    if(::arg().mustDo("fork")) {
1243      fork();
1244      L<<Logger::Warning<<"This is forked pid "<<getpid()<<endl;
1245    }
1246
1247
1248    makeControlChannelSocket();
1249   
1250    MT=new MTasker<PacketID,string>(100000);
1251
1252   
1253    PacketID pident;
1254    primeHints();   
1255    L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1256#ifndef WIN32
1257    if(::arg().mustDo("daemon")) {
1258      L.toConsole(Logger::Critical);
1259      daemonize();
1260    }
1261    signal(SIGUSR1,usr1Handler);
1262    signal(SIGUSR2,usr2Handler);
1263    signal(SIGPIPE,SIG_IGN);
1264
1265    writePid();
1266#endif
1267
1268    int newgid=0;
1269    if(!::arg()["setgid"].empty())
1270      newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1271    int newuid=0;
1272    if(!::arg()["setuid"].empty())
1273      newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1274
1275
1276    if (!::arg()["chroot"].empty()) {
1277        if (chroot(::arg()["chroot"].c_str())<0) {
1278            L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1279            exit(1);
1280        }
1281    }
1282
1283    Utility::dropPrivs(newuid, newgid);
1284
1285
1286    counter=0;
1287    unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1288    g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1289
1290    g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1291
1292    g_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1293    bool listenOnTCP(true);
1294
1295    for(;;) {
1296      while(MT->schedule()); // housekeeping, let threads do their thing
1297     
1298      if(!(counter%500)) {
1299        MT->makeThread(houseKeeping,0);
1300      }
1301
1302      if(!(counter%11)) {
1303        typedef vector<pair<int, boost::any> > expired_t;
1304        expired_t expired=g_fdm->getTimeouts(g_now);
1305       
1306        for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1307          TCPConnection conn=any_cast<TCPConnection>(i->second);
1308          if(conn.state != TCPConnection::DONE) {
1309            g_fdm->removeReadFD(i->first);
1310            conn.closeAndCleanup();
1311          }
1312        }
1313      }
1314     
1315      counter++;
1316
1317      if(statsWanted) {
1318        doStats();
1319      }
1320
1321      gettimeofday(&g_now, 0);
1322      g_fdm->run(&g_now);
1323
1324
1325      if(listenOnTCP) {
1326        if(TCPConnection::s_currentConnections > maxTcpClients) {
1327          g_fdm->removeReadFD(g_tcpListenSocket);
1328          listenOnTCP=false;
1329        }
1330      }
1331      else {
1332        if(TCPConnection::s_currentConnections <= maxTcpClients) {
1333          g_fdm->addReadFD(g_tcpListenSocket, handleNewTCPQuestion);
1334          listenOnTCP=true;
1335        }
1336      }
1337    }
1338  }
1339  catch(AhuException &ae) {
1340    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1341    ret=EXIT_FAILURE;
1342  }
1343  catch(exception &e) {
1344    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1345    ret=EXIT_FAILURE;
1346  }
1347  catch(...) {
1348    L<<Logger::Error<<"any other exception in main: "<<endl;
1349    ret=EXIT_FAILURE;
1350  }
1351 
1352#ifdef WIN32
1353  WSACleanup();
1354#endif // WIN32
1355
1356  return ret;
1357}
Note: See TracBrowser for help on using the browser.