root/trunk/pdns/pdns/pdns_recursor.cc @ 1640

Revision 1640, 64.0 KB (checked in by ahu, 3 years ago)

perform better accounting of TCP client numbers on DNS errors - spotted by Simon Bedford and Laurent Papier.

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2010  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#ifndef WIN32
20# include <netdb.h>
21# include <sys/stat.h>
22# include <unistd.h>
23#else
24 #include "ntservice.hh"
25 #include "recursorservice.hh"
26#endif // WIN32
27
28#include <boost/foreach.hpp>
29#include <pthread.h>
30#include "recpacketcache.hh"
31#include "utility.hh"
32#include "dns_random.hh"
33#include <iostream>
34#include <errno.h>
35#include <map>
36#include <set>
37#include "recursor_cache.hh"
38#include <stdio.h>
39#include <signal.h>
40#include <stdlib.h>
41#include "misc.hh"
42#include "mtasker.hh"
43#include <utility>
44#include "arguments.hh"
45#include "syncres.hh"
46#include <fcntl.h>
47#include <fstream>
48#include "sstuff.hh"
49#include <boost/tuple/tuple.hpp>
50#include <boost/tuple/tuple_comparison.hpp>
51#include <boost/shared_array.hpp>
52#include <boost/lexical_cast.hpp>
53#include <boost/function.hpp>
54#include <boost/algorithm/string.hpp>
55#include <netinet/tcp.h>
56#include "dnsparser.hh"
57#include "dnswriter.hh"
58#include "dnsrecords.hh"
59#include "zoneparser-tng.hh"
60#include "rec_channel.hh"
61#include "logger.hh"
62#include "iputils.hh"
63#include "mplexer.hh"
64#include "config.h"
65#include "lua-pdns-recursor.hh"
66
67#ifndef RECURSOR
68#include "statbag.hh"
69StatBag S;
70#endif
71
72__thread FDMultiplexer* t_fdm;
73__thread unsigned int t_id;
74unsigned int g_maxTCPPerClient;
75unsigned int g_networkTimeoutMsec;
76bool g_logCommonErrors;
77__thread shared_ptr<PowerDNSLua>* t_pdl;
78__thread RemoteKeeper* t_remotes;
79
80RecursorControlChannel s_rcc; // only active in thread 0
81
82// for communicating with our threads
83struct ThreadPipeSet
84{
85  int writeToThread;
86  int readToThread;
87  int writeFromThread;
88  int readFromThread;
89};
90
91vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
92
93SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
94
95#include "namespaces.hh"
96
97__thread MemRecursorCache* t_RC;
98__thread RecursorPacketCache* t_packetCache;
99RecursorStats g_stats;
100bool g_quiet;
101
102static __thread NetmaskGroup* t_allowFrom;
103static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
104
105NetmaskGroup* g_dontQuery;
106string s_programname="pdns_recursor";
107
108typedef vector<int> tcpListenSockets_t;
109tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
110int g_tcpTimeout;
111unsigned int g_maxMThreads;
112struct timeval g_now; // timestamp, updated (too) frequently
113map<int, ComboAddress> g_listenSocketsAddresses; // is shared across all threads right now
114
115__thread MT_t* MT; // the big MTasker
116
117unsigned int g_numThreads;
118
119#define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10"
120
121//! used to send information to a newborn mthread
122struct DNSComboWriter {
123  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), 
124                                                                                                        d_tcp(false), d_socket(-1)
125  {}
126  MOADNSParser d_mdp;
127  void setRemote(ComboAddress* sa)
128  {
129    d_remote=*sa;
130  }
131
132  void setSocket(int sock)
133  {
134    d_socket=sock;
135  }
136
137  string getRemote() const
138  {
139    return d_remote.toString();
140  }
141
142  struct timeval d_now;
143  ComboAddress d_remote;
144  bool d_tcp;
145  int d_socket;
146};
147
148
149ArgvMap &arg()
150{
151  static ArgvMap theArg;
152  return theArg;
153}
154
155
156void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
157
158// -1 is error, 0 is timeout, 1 is success
159int asendtcp(const string& data, Socket* sock) 
160{
161  PacketID pident;
162  pident.sock=sock;
163  pident.outMSG=data;
164 
165  t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
166  string packet;
167
168  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
169
170  if(!ret || ret==-1) { // timeout
171    t_fdm->removeWriteFD(sock->getHandle());
172  }
173  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
174    return -1;
175  }
176  return ret;
177}
178
179void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
180
181// -1 is error, 0 is timeout, 1 is success
182int arecvtcp(string& data, int len, Socket* sock) 
183{
184  data.clear();
185  PacketID pident;
186  pident.sock=sock;
187  pident.inNeeded=len;
188  t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
189
190  int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
191  if(!ret || ret==-1) { // timeout
192    t_fdm->removeReadFD(sock->getHandle());
193  }
194  else if(data.empty()) {// error, EOF or other
195    return -1;
196  }
197
198  return ret;
199}
200
201vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6; 
202const ComboAddress g_local4("0.0.0.0"), g_local6("::");
203
204//! pick a random query local address
205ComboAddress getQueryLocalAddress(int family, uint16_t port)
206{
207  ComboAddress ret;
208  if(family==AF_INET) {
209    if(g_localQueryAddresses4.empty()) 
210      ret = g_local4;
211    else 
212      ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
213    ret.sin4.sin_port = htons(port);
214  }
215  else {
216    if(g_localQueryAddresses6.empty())
217      ret = g_local6;
218    else
219      ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
220     
221    ret.sin6.sin6_port = htons(port);
222  }
223  return ret;
224}
225
226void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
227
228void setSocketBuffer(int fd, int optname, uint32_t size)
229{
230  uint32_t psize=0;
231  socklen_t len=sizeof(psize);
232 
233  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
234    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
235    return; 
236  }
237
238  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
239    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
240}
241
242
243static void setSocketReceiveBuffer(int fd, uint32_t size)
244{
245  setSocketBuffer(fd, SO_RCVBUF, size);
246}
247
248static void setSocketSendBuffer(int fd, uint32_t size)
249{
250  setSocketBuffer(fd, SO_SNDBUF, size);
251}
252
253
254// you can ask this class for a UDP socket to send a query from
255// this socket is not yours, don't even think about deleting it
256// but after you call 'returnSocket' on it, don't assume anything anymore
257class UDPClientSocks
258{
259  unsigned int d_numsocks;
260  unsigned int d_maxsocks;
261public:
262  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
263  {
264  }
265
266  typedef set<int> socks_t;
267  socks_t d_socks;
268
269  // returning -1 means: temporary OS error (ie, out of files), -2 means OS error
270  int getSocket(const ComboAddress& toaddr, int* fd)
271  {
272    *fd=makeClientSocket(toaddr.sin4.sin_family);
273    if(*fd < 0) // temporary error - receive exception otherwise
274      return -1;
275
276    if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
277      int err = errno;
278      //      returnSocket(*fd);
279      Utility::closesocket(*fd);
280      if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
281        return -2;
282      return -1;
283    }
284
285    d_socks.insert(*fd);
286    d_numsocks++;
287    return 0;
288  }
289
290  void returnSocket(int fd)
291  {
292    socks_t::iterator i=d_socks.find(fd);
293    if(i==d_socks.end()) {
294      throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
295    }
296    returnSocketLocked(i);
297  }
298
299  // return a socket to the pool, or simply erase it
300  void returnSocketLocked(socks_t::iterator& i)
301  {
302    if(i==d_socks.end()) {
303      throw AhuException("Trying to return a socket not in the pool");
304    }
305    try {
306      t_fdm->removeReadFD(*i);
307    }
308    catch(FDMultiplexerException& e) {
309      // we sometimes return a socket that has not yet been assigned to t_fdm
310    }
311    Utility::closesocket(*i);
312   
313    d_socks.erase(i++);
314    --d_numsocks;
315  }
316
317  // returns -1 for errors which might go away, throws for ones that won't
318  static int makeClientSocket(int family)
319  {
320    int ret=(int)socket(family, SOCK_DGRAM, 0);
321    if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
322      return ret;
323   
324    if(ret<0) 
325      throw AhuException("Making a socket for resolver: "+stringerror());
326
327   
328    int tries=10;
329    while(--tries) {
330      uint16_t port;
331     
332      if(tries==1)  // fall back to kernel 'random'
333        port = 0;
334      else
335        port = 1025 + dns_random(64510);
336     
337      ComboAddress sin=getQueryLocalAddress(family, port); // does htons for us
338
339      if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0) 
340        break;
341    }
342    if(!tries)
343      throw AhuException("Resolver binding to local query client socket: "+stringerror());
344   
345    Utility::setNonBlocking(ret);
346    return ret;
347  }
348};
349
350static __thread UDPClientSocks* t_udpclientsocks;
351
352/* these two functions are used by LWRes */
353// -2 is OS error, -1 is error that depends on the remote, > 0 is success
354int asendto(const char *data, int len, int flags, 
355            const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd) 
356{
357
358  PacketID pident;
359  pident.domain = domain;
360  pident.remote = toaddr;
361  pident.type = qtype;
362
363  // see if there is an existing outstanding request we can chain on to, using partial equivalence function
364  pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
365
366  for(; chain.first != chain.second; chain.first++) {
367    if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
368      /*
369      cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
370      cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
371          <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
372      */
373      chain.first->key.chain.insert(id); // we can chain
374      *fd=-1;                            // gets used in waitEvent / sendEvent later on
375      return 1;
376    }
377  }
378
379  int ret=t_udpclientsocks->getSocket(toaddr, fd);
380  if(ret < 0)
381    return ret;
382
383  pident.fd=*fd;
384  pident.id=id;
385 
386  t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
387  ret = send(*fd, data, len, 0);
388
389  int tmp = errno;
390
391  if(ret < 0)
392    t_udpclientsocks->returnSocket(*fd);
393
394  errno = tmp; // this is for logging purposes only
395  return ret;
396}
397
398// -1 is error, 0 is timeout, 1 is success
399int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len, 
400              uint16_t id, const string& domain, uint16_t qtype, int fd, struct timeval* now)
401{
402  static optional<unsigned int> nearMissLimit;
403  if(!nearMissLimit) 
404    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
405
406  PacketID pident;
407  pident.fd=fd;
408  pident.id=id;
409  pident.domain=domain;
410  pident.type = qtype;
411  pident.remote=fromaddr;
412
413  string packet;
414  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
415
416  if(ret > 0) {
417    if(packet.empty()) // means "error"
418      return -1; 
419
420    *d_len=(int)packet.size();
421    memcpy(data,packet.c_str(),min(len,*d_len));
422    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
423      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
424      g_stats.spoofCount++;
425      return -1;
426    }
427  }
428  else {
429    if(fd >= 0)
430      t_udpclientsocks->returnSocket(fd);
431  }
432  return ret;
433}
434
435
436string s_pidfname;
437static void writePid(void)
438{
439  ofstream of(s_pidfname.c_str(), ios_base::app);
440  if(of)
441    of<< Utility::getpid() <<endl;
442  else
443    L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
444}
445
446typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
447tcpClientCounts_t __thread* t_tcpClientCounts;
448
449
450void TCPConnection::closeAndCleanup(int fd, const ComboAddress& remote) 
451{
452  Utility::closesocket(fd);
453  if(!(*t_tcpClientCounts)[remote]--) 
454    t_tcpClientCounts->erase(remote);
455  s_currentConnections--;
456}
457void TCPConnection::closeAndCleanup()
458{
459  closeAndCleanup(fd, remote);
460}
461
462unsigned int TCPConnection::s_currentConnections; 
463void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
464
465void updateRcodeStats(int res)
466{
467  switch(res) {
468  case RCode::ServFail:
469    g_stats.servFails++;
470    break;
471  case RCode::NXDomain:
472    g_stats.nxDomains++;
473    break;
474  case RCode::NoError:
475    g_stats.noErrors++;
476    break;
477  }
478}
479
480void startDoResolve(void *p)
481{
482  DNSComboWriter* dc=(DNSComboWriter *)p;
483
484  try {
485    uint16_t maxudpsize=512;
486    EDNSOpts edo;
487    if(getEDNSOpts(dc->d_mdp, &edo)) {
488      maxudpsize=max(edo.d_packetsize, (uint16_t)1280);
489    }
490   
491    vector<DNSResourceRecord> ret;
492    vector<uint8_t> packet;
493
494    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); 
495
496    pw.getHeader()->aa=0;
497    pw.getHeader()->ra=1;
498    pw.getHeader()->qr=1;
499    pw.getHeader()->tc=0;
500    pw.getHeader()->id=dc->d_mdp.d_header.id;
501    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
502
503    SyncRes sr(dc->d_now);
504    if(!g_quiet)
505      L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
506       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
507
508    sr.setId(MT->getTid());
509    if(!dc->d_mdp.d_header.rd)
510      sr.setCacheOnly();
511
512    int res;
513
514    bool variableAnswer = false;
515    if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer)) {
516       res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
517
518      if(t_pdl->get()) {
519        if(res == RCode::NXDomain)
520          (*t_pdl)->nxdomain(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
521      }
522    }
523   
524    uint32_t minTTL=numeric_limits<uint32_t>::max();
525    if(res<0) {
526      pw.getHeader()->rcode=RCode::ServFail;
527      // no commit here, because no record
528      g_stats.servFails++;
529    }
530    else {
531      pw.getHeader()->rcode=res;
532      updateRcodeStats(res);
533   
534      if(ret.size()) {
535        shuffle(ret);
536       
537        for(vector<DNSResourceRecord>::const_iterator i=ret.begin(); i!=ret.end(); ++i) {
538          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place); 
539          minTTL = min(minTTL, i->ttl);
540          if(i->qtype.getCode() == QType::A) { // blast out A record w/o doing whole dnswriter thing
541            uint32_t ip=0;
542            IpToU32(i->content, &ip);
543            pw.xfr32BitInt(htonl(ip));
544          } else {
545            shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content)); 
546            drc->toPacket(pw);
547          }
548          if(!dc->d_tcp && pw.size() > maxudpsize) {
549            pw.rollback();
550            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
551              pw.getHeader()->tc=1;
552            goto sendit; // need to jump over pw.commit
553          }
554        }
555
556      pw.commit();
557      }
558    }
559  sendit:;
560    if(!dc->d_tcp) {
561      sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
562      if(!SyncRes::s_nopacketcache && !variableAnswer ) {
563        t_packetCache->insertResponsePacket(string((const char*)&*packet.begin(), packet.size()), g_now.tv_sec, 
564                                           min(minTTL, 
565                                               (pw.getHeader()->rcode == RCode::ServFail) ? SyncRes::s_packetcacheservfailttl : SyncRes::s_packetcachettl
566                                               ) 
567                                          );
568      }
569    }
570    else {
571      char buf[2];
572      buf[0]=packet.size()/256;
573      buf[1]=packet.size()%256;
574
575      Utility::iovec iov[2];
576
577      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
578      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
579
580      int ret=Utility::writev(dc->d_socket, iov, 2);
581      bool hadError=true;
582
583      if(ret == 0) 
584        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
585      else if(ret < 0 ) 
586        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
587      else if((unsigned int)ret != 2 + packet.size())
588        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
589      else
590        hadError=false;
591     
592      // update tcp connection status, either by closing or moving to 'BYTE0'
593   
594      if(hadError) {
595        // no need to remove us from FDM, we weren't there
596        TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
597        dc->d_socket = -1;
598      }
599      else {
600        TCPConnection tc;
601        tc.fd=dc->d_socket;
602        tc.state=TCPConnection::BYTE0;
603        tc.remote=dc->d_remote;
604        Utility::gettimeofday(&g_now, 0); // needs to be updated
605        tc.startTime=g_now.tv_sec;
606        t_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
607        t_fdm->setReadTTD(tc.fd, g_now, g_tcpTimeout);
608      }
609    }
610   
611    if(!g_quiet) {
612      L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
613      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
614      sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
615    }
616
617    sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++; 
618    float spent=makeFloat(sr.d_now-dc->d_now);
619    if(spent < 0.001)
620      g_stats.answers0_1++;
621    else if(spent < 0.010)
622      g_stats.answers1_10++;
623    else if(spent < 0.1)
624      g_stats.answers10_100++;
625    else if(spent < 1.0)
626      g_stats.answers100_1000++;
627    else
628      g_stats.answersSlow++;
629
630    uint64_t newLat=(uint64_t)(spent*1000000);
631    if(newLat < 1000000)  // outliers of several minutes exist..
632      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
633
634    delete dc;
635    dc=0;
636  }
637  catch(AhuException &ae) {
638    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
639    if(dc && dc->d_tcp && dc->d_socket >= 0) 
640      TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
641    delete dc;
642  }
643  catch(MOADNSException& e) {
644    if(dc && dc->d_tcp && dc->d_socket >= 0) 
645      TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
646     
647    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
648    delete dc;
649  }
650  catch(std::exception& e) {
651    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
652    if(dc && dc->d_tcp && dc->d_socket >= 0) 
653      TCPConnection::closeAndCleanup(dc->d_socket, dc->d_remote);
654    delete dc;
655  }
656  catch(...) {
657    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
658  }
659}
660
661void makeControlChannelSocket()
662{
663  string sockname=::arg()["socket-dir"]+"/pdns_recursor.controlsocket";
664  s_rcc.listen(sockname);
665 
666#ifndef WIN32
667  int sockowner = -1;
668  int sockgroup = -1;
669
670  if (!::arg().isEmpty("socket-group"))
671    sockgroup=::arg().asGid("socket-group");
672  if (!::arg().isEmpty("socket-owner"))
673    sockowner=::arg().asUid("socket-owner");
674 
675  if (sockgroup > -1 || sockowner > -1) {
676    if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
677      unixDie("Failed to chown control socket");
678    }
679  }
680
681  // do mode change if socket-mode is given
682  if(!::arg().isEmpty("socket-mode")) {
683    mode_t sockmode=::arg().asMode("socket-mode");
684    chmod(sockname.c_str(), sockmode);
685  }
686#endif
687}
688
689void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
690{
691  TCPConnection* conn=any_cast<TCPConnection>(&var);
692
693  if(conn->state==TCPConnection::BYTE0) {
694    int bytes=recv(conn->fd, conn->data, 2, 0);
695    if(bytes==1)
696      conn->state=TCPConnection::BYTE1;
697    if(bytes==2) { 
698      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
699      conn->bytesread=0;
700      conn->state=TCPConnection::GETQUESTION;
701    }
702    if(!bytes || bytes < 0) {
703      TCPConnection tmp(*conn); 
704      t_fdm->removeReadFD(fd);
705      tmp.closeAndCleanup();
706      return;
707    }
708  }
709  else if(conn->state==TCPConnection::BYTE1) {
710    int bytes=recv(conn->fd, conn->data+1, 1, 0);
711    if(bytes==1) {
712      conn->state=TCPConnection::GETQUESTION;
713      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
714      conn->bytesread=0;
715    }
716    if(!bytes || bytes < 0) {
717      if(g_logCommonErrors)
718        L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected after first byte"<<endl;
719      TCPConnection tmp(*conn); 
720      t_fdm->removeReadFD(fd);
721      tmp.closeAndCleanup();  // conn loses validity here..
722      return;
723    }
724  }
725  else if(conn->state==TCPConnection::GETQUESTION) {
726    int bytes=recv(conn->fd, conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
727    if(!bytes || bytes < 0) {
728      L<<Logger::Error<<"TCP client "<< conn->remote.toString() <<" disconnected while reading question body"<<endl;
729      TCPConnection tmp(*conn);
730      t_fdm->removeReadFD(fd);
731      tmp.closeAndCleanup();  // conn loses validity here..
732
733      return;
734    }
735    conn->bytesread+=bytes;
736    if(conn->bytesread==conn->qlen) {
737      TCPConnection tconn(*conn); 
738      t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
739
740      DNSComboWriter* dc=0;
741      try {
742        dc=new DNSComboWriter(tconn.data, tconn.qlen, g_now);
743      }
744      catch(MOADNSException &mde) {
745        g_stats.clientParseError++; 
746        if(g_logCommonErrors)
747          L<<Logger::Error<<"Unable to parse packet from TCP client "<< tconn.remote.toString() <<endl;
748        tconn.closeAndCleanup();
749        return;
750      }
751     
752      dc->setSocket(tconn.fd);
753      dc->d_tcp=true;
754      dc->setRemote(&tconn.remote);
755      if(dc->d_mdp.d_header.qr) {
756        delete dc;
757        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
758        tconn.closeAndCleanup();
759        return;
760      }
761      else {
762        ++g_stats.qcounter;
763        ++g_stats.tcpqcounter;
764        MT->makeThread(startDoResolve, dc); // deletes dc
765        return;
766      }
767    }
768  }
769}
770
771//! Handle new incoming TCP connection
772void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
773{
774  ComboAddress addr;
775  socklen_t addrlen=sizeof(addr);
776  int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
777  if(newsock>0) {
778    if(MT->numProcesses() > g_maxMThreads) {
779      g_stats.overCapacityDrops++;
780      Utility::closesocket(newsock);
781      return;
782    }
783
784    t_remotes->addRemote(addr);
785    if(t_allowFrom && !t_allowFrom->match(&addr)) {
786      if(!g_quiet) 
787        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
788
789      g_stats.unauthorizedTCP++;
790      Utility::closesocket(newsock);
791      return;
792    }
793    if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
794      g_stats.tcpClientOverflow++;
795      Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
796      return;
797    }
798    (*t_tcpClientCounts)[addr]++;
799    Utility::setNonBlocking(newsock);
800    TCPConnection tc;
801    tc.fd=newsock;
802    tc.state=TCPConnection::BYTE0;
803    tc.remote=addr;
804    tc.startTime=g_now.tv_sec;
805    TCPConnection::s_currentConnections++;
806    t_fdm->addReadFD(tc.fd, handleRunningTCPQuestion, tc);
807
808    struct timeval now;
809    Utility::gettimeofday(&now, 0);
810    t_fdm->setReadTTD(tc.fd, now, g_tcpTimeout);
811  }
812}
813 
814void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
815{
816  int len;
817  char data[1500];
818  ComboAddress fromaddr;
819  socklen_t addrlen=sizeof(fromaddr);
820 
821  if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
822    t_remotes->addRemote(fromaddr);
823
824    if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
825      if(!g_quiet) 
826        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
827
828      g_stats.unauthorizedUDP++;
829      return;
830    }
831    try {
832      dnsheader* dh=(dnsheader*)data;
833     
834      if(dh->qr) {
835        if(g_logCommonErrors)
836          L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
837      }
838      else {
839        ++g_stats.qcounter;
840
841        string response;
842        try {
843          uint32_t age;
844          if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) {
845            if(!g_quiet)
846              L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
847 
848            g_stats.packetCacheHits++;
849            SyncRes::s_queries++;
850            ageDNSPacket(response, age);
851            sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
852            if(response.length() >= sizeof(struct dnsheader))
853              updateRcodeStats(((struct dnsheader*)response.c_str())->rcode);
854            g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
855            return;
856          }
857        } 
858        catch(std::exception& e) {
859          throw MOADNSException(e.what()); // translate
860        }
861        if(MT->numProcesses() > g_maxMThreads) {
862          g_stats.overCapacityDrops++;
863          return;
864        }
865 
866        DNSComboWriter* dc = new DNSComboWriter(data, len, g_now);
867        dc->setSocket(fd);
868        dc->setRemote(&fromaddr);
869
870        dc->d_tcp=false;
871
872        MT->makeThread(startDoResolve, (void*) dc); // deletes dc
873      }
874    }
875    catch(MOADNSException& mde) {
876      g_stats.clientParseError++; 
877      if(g_logCommonErrors)
878        L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
879    }
880  }
881  else {
882    // cerr<<t_id<<" had error: "<<stringerror()<<endl;
883    if(errno == EAGAIN)
884      g_stats.noPacketError++;
885  }
886}
887
888typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
889deferredAdd_t deferredAdd;
890
891void makeTCPServerSockets()
892{
893  int fd;
894  vector<string>locals;
895  stringtok(locals,::arg()["local-address"]," ,");
896
897  if(locals.empty())
898    throw AhuException("No local address specified");
899 
900  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
901    ServiceTuple st;
902    st.port=::arg().asNum("local-port");
903    parseService(*i, st);
904   
905    ComboAddress sin;
906
907    memset((char *)&sin,0, sizeof(sin));
908    sin.sin4.sin_family = AF_INET;
909    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
910      sin.sin6.sin6_family = AF_INET6;
911      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
912        throw AhuException("Unable to resolve local address for TCP server on '"+ st.host +"'"); 
913    }
914
915    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
916    if(fd<0) 
917      throw AhuException("Making a TCP server socket for resolver: "+stringerror());
918
919    int tmp=1;
920    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
921      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
922      exit(1);
923    }
924   
925#ifdef TCP_DEFER_ACCEPT
926    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
927      if(i==locals.begin())
928        L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
929    }
930#endif
931
932    sin.sin4.sin_port = htons(st.port);
933    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
934    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0) 
935      throw AhuException("Binding TCP server socket for "+ st.host +": "+stringerror());
936   
937    Utility::setNonBlocking(fd);
938    setSocketSendBuffer(fd, 65000);
939    listen(fd, 128);
940    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
941    g_tcpListenSockets.push_back(fd);
942
943    if(sin.sin4.sin_family == AF_INET) 
944      L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
945    else
946      L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
947  }
948}
949
950
951
952void makeUDPServerSockets()
953{
954  vector<string>locals;
955  stringtok(locals,::arg()["local-address"]," ,");
956
957  if(locals.empty())
958    throw AhuException("No local address specified");
959 
960  if(::arg()["local-address"]=="0.0.0.0") {
961    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
962  }
963
964  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
965    ServiceTuple st;
966    st.port=::arg().asNum("local-port");
967    parseService(*i, st);
968
969    ComboAddress sin;
970
971    memset(&sin, 0, sizeof(sin));
972    sin.sin4.sin_family = AF_INET;
973    if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
974      sin.sin6.sin6_family = AF_INET6;
975      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
976        throw AhuException("Unable to resolve local address for UDP server on '"+ st.host +"'"); 
977    }
978   
979    int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
980
981    if(fd < 0) {
982      throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
983    }
984
985    setSocketReceiveBuffer(fd, 200000);
986    sin.sin4.sin_port = htons(st.port);
987
988    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
989    if (::bind(fd, (struct sockaddr *)&sin, socklen)<0) 
990      throw AhuException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
991   
992    Utility::setNonBlocking(fd);
993
994    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
995    g_listenSocketsAddresses[fd]=sin;  // this is written to only from the startup thread, not from the workers
996    if(sin.sin4.sin_family == AF_INET) 
997      L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
998    else
999      L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1000  }
1001}
1002
1003
1004#ifndef WIN32
1005void daemonize(void)
1006{
1007  if(fork())
1008    exit(0); // bye bye
1009 
1010  setsid(); 
1011
1012  int i=open("/dev/null",O_RDWR); /* open stdin */
1013  if(i < 0) 
1014    L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1015  else {
1016    dup2(i,0); /* stdin */
1017    dup2(i,1); /* stderr */
1018    dup2(i,2); /* stderr */
1019    close(i);
1020  }
1021}
1022#endif
1023
1024uint64_t counter;
1025bool statsWanted;
1026
1027void usr1Handler(int)
1028{
1029  statsWanted=true;
1030}
1031
1032void usr2Handler(int)
1033{
1034  SyncRes::setLog(true);
1035  g_quiet=false;
1036  ::arg().set("quiet")="no";
1037
1038}
1039
1040void doStats(void)
1041{
1042  static time_t lastOutputTime;
1043  static uint64_t lastQueryCount;
1044 
1045  if(g_stats.qcounter && (t_RC->cacheHits + t_RC->cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {  // this only runs once thread 0 has had hits
1046    uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1047    uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1048   
1049    L<<Logger::Warning<<"stats: "<<g_stats.qcounter<<" questions, "<<
1050      broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1051      broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1052      (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl; 
1053   
1054    L<<Logger::Warning<<"stats: throttle map: "
1055      << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1056      << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl; 
1057    L<<Logger::Warning<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1058    L<<Logger::Warning<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1059     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1060    L<<Logger::Warning<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1061      broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1062
1063    //L<<Logger::Warning<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1064      //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1065   
1066    L<<Logger::Warning<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1067    " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1068   
1069    time_t now = time(0);
1070    if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1071      L<<Logger::Warning<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1072    }
1073    lastOutputTime = now;
1074    lastQueryCount = SyncRes::s_queries;
1075  }
1076  else if(statsWanted) 
1077    L<<Logger::Warning<<"stats: no stats yet!"<<endl;
1078
1079  statsWanted=false;
1080}
1081
1082static void houseKeeping(void *)
1083try
1084{
1085  static __thread time_t last_stat, last_rootupdate, last_prune;
1086  static __thread int cleanCounter=0;
1087  struct timeval now;
1088  Utility::gettimeofday(&now, 0);
1089
1090//  clog<<"* "<<t_id<<" "<<(void*)&last_stat<<"\t"<<(unsigned int)last_stat<<endl;
1091
1092  if(now.tv_sec - last_prune > (time_t)(5 + t_id)) { 
1093    DTime dt;
1094    dt.setTimeval(now);
1095    t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1096    t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numThreads);
1097   
1098    {
1099      typedef SyncRes::negcache_t::nth_index<1>::type negcache_by_ttd_index_t;
1100      negcache_by_ttd_index_t& ttdindex=boost::multi_index::get<1>(t_sstorage->negcache); 
1101      negcache_by_ttd_index_t::iterator i=ttdindex.lower_bound(now.tv_sec);
1102      ttdindex.erase(ttdindex.begin(), i);
1103    }
1104   
1105    if(!((cleanCounter++)%40)) {  // this is a full scan!
1106      time_t limit=now.tv_sec-300;
1107      for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1108        if(i->second.stale(limit))
1109          t_sstorage->nsSpeeds.erase(i++);
1110        else
1111          ++i;
1112    }
1113//    L<<Logger::Warning<<"Spent "<<dt.udiff()/1000<<" msec cleaning"<<endl;
1114    last_prune=time(0);
1115  }
1116 
1117  if(!t_id) {
1118    if(now.tv_sec - last_stat > 1800) { 
1119      doStats();
1120      last_stat=time(0);
1121    }
1122  }
1123 
1124  if(now.tv_sec - last_rootupdate > 7200) {
1125    SyncRes sr(now);
1126    sr.setDoEDNS0(true);
1127    vector<DNSResourceRecord> ret;
1128
1129    sr.setNoCache();
1130    int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
1131    if(!res) {
1132      L<<Logger::Warning<<"Refreshed . records"<<endl;
1133      last_rootupdate=now.tv_sec;
1134    }
1135    else
1136      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1137  }
1138}
1139catch(AhuException& ae)
1140{
1141  L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1142  throw;
1143}
1144;
1145
1146void makeThreadPipes()
1147{
1148  for(unsigned int n=0; n < g_numThreads; ++n) {
1149    struct ThreadPipeSet tps;
1150    int fd[2];
1151    if(pipe(fd) < 0)
1152      unixDie("Creating pipe for inter-thread communications");
1153   
1154    tps.readToThread = fd[0];
1155    tps.writeToThread = fd[1];
1156   
1157    if(pipe(fd) < 0)
1158      unixDie("Creating pipe for inter-thread communications");
1159    tps.readFromThread = fd[0];
1160    tps.writeFromThread = fd[1];
1161   
1162    g_pipes.push_back(tps);
1163  }
1164}
1165
1166void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1167{
1168  unsigned int n = 0;
1169  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1170  {
1171    if(n++ == t_id) {
1172      if(!skipSelf)
1173        func(); // don't write to ourselves!
1174      continue;
1175    }
1176     
1177    pipefunc_t *funcptr = new pipefunc_t(func);
1178    if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr))
1179      unixDie("write to thread pipe returned wrong size or error");
1180   
1181    string* resp;
1182    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1183      unixDie("read from thread pipe returned wrong size or error");
1184   
1185    if(resp) {
1186//      cerr <<"got response: " << *resp << endl;
1187      delete resp;
1188    }
1189  }
1190}
1191
1192
1193
1194void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
1195{
1196  pipefunc_t* func;
1197  if(read(fd, &func, sizeof(func)) != sizeof(func)) { // fd == readToThread
1198    unixDie("read from thread pipe returned wrong size or error");
1199  }
1200 
1201  void *resp = (*func)();
1202 
1203  if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
1204    unixDie("write to thread pipe returned wrong size or error");
1205 
1206  delete func;
1207}
1208
1209template<class T> void *voider(const boost::function<T*()>& func)
1210{
1211  return func();
1212}
1213
1214vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
1215{
1216  a.insert(a.end(), b.begin(), b.end());
1217  return a;
1218}
1219
1220
1221
1222template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
1223{
1224  unsigned int n = 0;
1225  T ret=T();
1226  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1227  {
1228    if(n++ == t_id) {
1229      if(!skipSelf) {
1230        T* resp = (T*)func(); // don't write to ourselves!
1231        if(resp) {
1232          //~ cerr <<"got direct: " << *resp << endl;
1233          ret += *resp;
1234          delete resp;
1235        }
1236      }
1237      continue;
1238    }
1239     
1240    pipefunc_t *funcptr = new pipefunc_t(boost::bind(voider<T>, func));
1241    if(write(tps.writeToThread, &funcptr, sizeof(funcptr)) != sizeof(funcptr))
1242      unixDie("write to thread pipe returned wrong size or error");
1243   
1244    T* resp;
1245    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1246      unixDie("read from thread pipe returned wrong size or error");
1247   
1248    if(resp) {
1249      //~ cerr <<"got response: " << *resp << endl;
1250      ret += *resp;
1251      delete resp;
1252    }
1253  }
1254  return ret;
1255}
1256
1257template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
1258template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
1259template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
1260
1261void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
1262{
1263  string remote;
1264  string msg=s_rcc.recv(&remote);
1265  RecursorControlParser rcp;
1266  RecursorControlParser::func_t* command;
1267  string answer=rcp.getAnswer(msg, &command);
1268  try {
1269    s_rcc.send(answer, &remote);
1270    command();
1271  }
1272  catch(std::exception& e) {
1273    L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1274  }
1275  catch(AhuException& ae) {
1276    L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1277  }
1278}
1279
1280void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
1281{
1282  PacketID* pident=any_cast<PacketID>(&var);
1283  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1284
1285  shared_array<char> buffer(new char[pident->inNeeded]);
1286
1287  int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1288  if(ret > 0) {
1289    pident->inMSG.append(&buffer[0], &buffer[ret]);
1290    pident->inNeeded-=ret;
1291    if(!pident->inNeeded) {
1292      //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1293      PacketID pid=*pident;
1294      string msg=pident->inMSG;
1295     
1296      t_fdm->removeReadFD(fd);
1297      MT->sendEvent(pid, &msg); 
1298    }
1299    else {
1300      //      cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1301    }
1302  }
1303  else {
1304    PacketID tmp=*pident;
1305    t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1306    string empty;
1307    MT->sendEvent(tmp, &empty); // this conveys error status
1308  }
1309}
1310
1311void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
1312{
1313  PacketID* pid=any_cast<PacketID>(&var);
1314  int ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
1315  if(ret > 0) {
1316    pid->outPos+=ret;
1317    if(pid->outPos==pid->outMSG.size()) {
1318      PacketID tmp=*pid;
1319      t_fdm->removeWriteFD(fd);
1320      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1321    }
1322  }
1323  else {  // error or EOF
1324    PacketID tmp(*pid);
1325    t_fdm->removeWriteFD(fd);
1326    string sent;
1327    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1328  }
1329}
1330
1331// resend event to everybody chained onto it
1332void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1333{
1334  if(iter->key.chain.empty())
1335    return;
1336  //  cerr<<"doResends called!\n";
1337  for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1338    resend.fd=-1;
1339    resend.id=*i;
1340    //    cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
1341
1342    MT->sendEvent(resend, &content);
1343    g_stats.chainResends++;
1344  }
1345}
1346
1347void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
1348{
1349  PacketID pid=any_cast<PacketID>(var);
1350  int len;
1351  char data[1500];
1352  ComboAddress fromaddr;
1353  socklen_t addrlen=sizeof(fromaddr);
1354
1355  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1356
1357  if(len < (int)sizeof(dnsheader)) {
1358    if(len < 0)
1359      ; //      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1360    else {
1361      g_stats.serverParseError++; 
1362      if(g_logCommonErrors)
1363        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< sockAddrToString((struct sockaddr_in*) &fromaddr) <<
1364          ": packet smalller than DNS header"<<endl;
1365    }
1366
1367    t_udpclientsocks->returnSocket(fd);
1368    string empty;
1369
1370    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1371    if(iter != MT->d_waiters.end()) 
1372      doResends(iter, pid, empty);
1373   
1374    MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1375    return;
1376  } 
1377
1378  dnsheader dh;
1379  memcpy(&dh, data, sizeof(dh));
1380 
1381  if(dh.qr) {
1382    PacketID pident;
1383    pident.remote=fromaddr;
1384    pident.id=dh.id;
1385    pident.fd=fd;
1386    if(!dh.qdcount) { // UPC, Nominum, very old BIND on FormErr, NSD
1387      pident.domain.clear();
1388      pident.type = 0;
1389    }
1390    else {
1391      try {
1392        pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1393      }
1394      catch(std::exception& e) {
1395        g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
1396        L<<Logger::Warning<<"Error in packet from "<<sockAddrToString((struct sockaddr_in*) &fromaddr) << ": "<<e.what() << endl;
1397        return;
1398      }
1399    }
1400    string packet;
1401    packet.assign(data, len);
1402
1403    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1404    if(iter != MT->d_waiters.end()) {
1405      doResends(iter, pident, packet);
1406    }
1407
1408  retryWithName:
1409
1410    if(!MT->sendEvent(pident, &packet)) {
1411      // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
1412      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1413        if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
1414           pdns_iequals(pident.domain, mthread->key.domain)) {
1415          mthread->key.nearMisses++;
1416        }
1417
1418        // be a bit paranoid here since we're weakening our matching
1419        if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type && 
1420           pident.id  == mthread->key.id && mthread->key.remote == pident.remote) {
1421          // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
1422          pident.domain = mthread->key.domain;
1423          pident.type = mthread->key.type;
1424          goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
1425        }
1426      }
1427      g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
1428      if(g_logCommonErrors)
1429        L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<<pident.domain<<", "<<pident.type<<endl;
1430    }
1431    else if(fd >= 0) {
1432      t_udpclientsocks->returnSocket(fd);
1433    }
1434  }
1435  else
1436    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< sockAddrToString((struct sockaddr_in*) &fromaddr)  <<endl;
1437}
1438
1439FDMultiplexer* getMultiplexer()
1440{
1441  FDMultiplexer* ret;
1442  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1443      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1444    try {
1445      ret=i->second();
1446      return ret;
1447    }
1448    catch(FDMultiplexerException &fe) {
1449      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1450    }
1451    catch(...) {
1452      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1453    }
1454  }
1455  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1456  exit(1);
1457}
1458
1459 
1460void* doReloadLuaScript()
1461{
1462  string fname= ::arg()["lua-dns-script"];
1463  try {
1464    if(fname.empty()) {
1465      t_pdl->reset();
1466      L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
1467    }
1468    else {
1469      *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(fname));
1470    }
1471  }
1472  catch(std::exception& e) {
1473    L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
1474  }
1475   
1476  L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
1477  return 0;
1478}
1479
1480string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1481{
1482  if(begin != end) 
1483    ::arg().set("lua-dns-script") = *begin;
1484 
1485  broadcastFunction(doReloadLuaScript);
1486 
1487  return "ok, reload/unload queued\n";
1488} 
1489
1490void* recursorThread(void*);
1491
1492void* pleaseSupplantACLs(NetmaskGroup *ng)
1493{
1494  t_allowFrom = ng;
1495  return 0;
1496}
1497
1498void parseACLs()
1499{
1500  static bool l_initialized;
1501 
1502  if(l_initialized) { // only reload configuration file on second call
1503    string configname=::arg()["config-dir"]+"/recursor.conf";
1504    cleanSlashes(configname);
1505   
1506    if(!::arg().preParseFile(configname.c_str(), "allow-from-file")) 
1507      L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1508   
1509    ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
1510  }
1511
1512  NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
1513 
1514  if(!::arg()["allow-from-file"].empty()) {
1515    string line;
1516    ifstream ifs(::arg()["allow-from-file"].c_str());
1517    if(!ifs) {
1518      delete allowFrom; 
1519      throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
1520    }
1521
1522    string::size_type pos;
1523    while(getline(ifs,line)) {
1524      pos=line.find('#');
1525      if(pos!=string::npos)
1526        line.resize(pos);
1527      trim(line);
1528      if(line.empty())
1529        continue;
1530
1531      allowFrom->addMask(line);
1532    }
1533    L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
1534  }
1535  else if(!::arg()["allow-from"].empty()) {
1536    vector<string> ips;
1537    stringtok(ips, ::arg()["allow-from"], ", ");
1538   
1539    L<<Logger::Warning<<"Only allowing queries from: ";
1540    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1541      allowFrom->addMask(*i);
1542      if(i!=ips.begin())
1543        L<<Logger::Warning<<", ";
1544      L<<Logger::Warning<<*i;
1545    }
1546    L<<Logger::Warning<<endl;
1547  }
1548  else {
1549    if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53) 
1550      L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1551    delete allowFrom;
1552    allowFrom = 0;
1553  }
1554 
1555  g_initialAllowFrom = allowFrom;
1556  broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
1557  delete oldAllowFrom;
1558 
1559  l_initialized = true;
1560}
1561
1562int serviceMain(int argc, char*argv[])
1563{
1564  L.setName("pdns_recursor");
1565
1566  L.setLoglevel((Logger::Urgency)(6)); // info and up
1567
1568  if(!::arg()["logging-facility"].empty()) {
1569    boost::optional<int> val=logFacilityToLOG(::arg().asNum("logging-facility") );
1570    if(val)
1571      theL().setFacility(*val);
1572    else
1573      L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
1574  }
1575
1576  L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2010 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1577#ifdef __GNUC__
1578  L<<", gcc "__VERSION__;
1579#endif // add other compilers here
1580#ifdef _MSC_VER
1581  L<<", MSVC "<<_MSC_VER;
1582#endif
1583  L<<") starting up"<<endl;
1584 
1585  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1586    "This is free software, and you are welcome to redistribute it "
1587    "according to the terms of the GPL version 2."<<endl;
1588 
1589  L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1590 
1591  #if 0
1592  unsigned int maxFDs, curFDs;
1593  getFDLimits(curFDs, maxFDs);
1594  if(curFDs < 2048)
1595    L<<Logger::Warning<<"Only "<<curFDs<<" file descriptors available (out of: "<<maxFDs<<"), may not be suitable for high performance"<<endl;
1596  #endif
1597 
1598  seedRandom(::arg()["entropy-source"]);
1599
1600  parseACLs();
1601 
1602  if(!::arg()["dont-query"].empty()) {
1603    g_dontQuery=new NetmaskGroup;
1604    vector<string> ips;
1605    stringtok(ips, ::arg()["dont-query"], ", ");
1606    ips.push_back("0.0.0.0");
1607    ips.push_back("::");
1608
1609    L<<Logger::Warning<<"Will not send queries to: ";
1610    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1611      g_dontQuery->addMask(*i);
1612      if(i!=ips.begin())
1613        L<<Logger::Warning<<", ";
1614      L<<Logger::Warning<<*i;
1615    }
1616    L<<Logger::Warning<<endl;
1617  }
1618
1619  g_quiet=::arg().mustDo("quiet");
1620  if(::arg().mustDo("trace")) {
1621    SyncRes::setLog(true);
1622    ::arg().set("quiet")="no";
1623    g_quiet=false;
1624  }
1625 
1626  try {
1627    vector<string> addrs; 
1628    if(!::arg()["query-local-address6"].empty()) {
1629      SyncRes::s_doIPv6=true;
1630      L<<Logger::Error<<"Enabling IPv6 transport for outgoing queries"<<endl;
1631     
1632      stringtok(addrs, ::arg()["query-local-address6"], ", ;");
1633      BOOST_FOREACH(const string& addr, addrs) {
1634        g_localQueryAddresses6.push_back(ComboAddress(addr));
1635      }
1636    }
1637    addrs.clear();
1638    stringtok(addrs, ::arg()["query-local-address"], ", ;");
1639    BOOST_FOREACH(const string& addr, addrs) {
1640      g_localQueryAddresses4.push_back(ComboAddress(addr));
1641    }
1642  }
1643  catch(std::exception& e) {
1644    L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
1645    exit(99);
1646  }
1647 
1648  SyncRes::s_noEDNSPing = ::arg().mustDo("disable-edns-ping");
1649  SyncRes::s_noEDNS = ::arg().mustDo("disable-edns");
1650
1651  SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
1652
1653  SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1654  SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
1655  SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
1656  SyncRes::s_packetcacheservfailttl=::arg().asNum("packetcache-servfail-ttl");
1657  SyncRes::s_serverID=::arg()["server-id"];
1658  if(SyncRes::s_serverID.empty()) {
1659    char tmp[128];
1660    gethostname(tmp, sizeof(tmp)-1);
1661    SyncRes::s_serverID=tmp;
1662  }
1663 
1664  g_networkTimeoutMsec = ::arg().asNum("network-timeout");
1665
1666  g_initialDomainMap = parseAuthAndForwards();
1667 
1668   
1669  g_logCommonErrors=::arg().mustDo("log-common-errors");
1670 
1671  makeUDPServerSockets();
1672  makeTCPServerSockets();
1673
1674  s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
1675  if(!s_pidfname.empty())
1676    unlink(s_pidfname.c_str()); // remove possible old pid file
1677 
1678#ifndef WIN32
1679  if(::arg().mustDo("daemon")) {
1680    L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1681    L.toConsole(Logger::Critical);
1682    daemonize();
1683  }
1684  signal(SIGUSR1,usr1Handler);
1685  signal(SIGUSR2,usr2Handler);
1686  signal(SIGPIPE,SIG_IGN);
1687  writePid();
1688#endif
1689  makeControlChannelSocket();
1690 
1691  int newgid=0;
1692  if(!::arg()["setgid"].empty())
1693    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1694  int newuid=0;
1695  if(!::arg()["setuid"].empty())
1696    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1697
1698#ifndef WIN32
1699  if (!::arg()["chroot"].empty()) {
1700    if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
1701      L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1702      exit(1);
1703    }
1704  }
1705
1706  Utility::dropPrivs(newuid, newgid);
1707 
1708 
1709  g_numThreads = ::arg().asNum("threads");
1710 
1711  makeThreadPipes();
1712 
1713  g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1714  g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1715  g_maxMThreads=::arg().asNum("max-mthreads");
1716
1717  if(g_numThreads == 1) {
1718    L<<Logger::Warning<<"Operating unthreaded"<<endl;
1719    recursorThread(0);
1720  }
1721  else {
1722    pthread_t tid;
1723    L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
1724    for(unsigned int n=0; n < g_numThreads; ++n) {
1725      pthread_create(&tid, 0, recursorThread, (void*)n);
1726    }
1727    void* res;
1728
1729   
1730    pthread_join(tid, &res);
1731  }
1732  return 0;
1733}
1734
1735void* recursorThread(void* ptr)
1736try
1737{
1738  t_id=(int) (long) ptr;
1739  SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
1740  t_sstorage->domainmap = g_initialDomainMap;
1741  t_allowFrom = g_initialAllowFrom;
1742  t_udpclientsocks = new UDPClientSocks();
1743  t_tcpClientCounts = new tcpClientCounts_t();
1744  primeHints();
1745 
1746  t_packetCache = new RecursorPacketCache();
1747 
1748  L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1749   
1750  t_RC->d_followRFC2181=::arg().mustDo("auth-can-lower-ttl");
1751  t_pdl = new shared_ptr<PowerDNSLua>();
1752 
1753  try {
1754    if(!::arg()["lua-dns-script"].empty()) {
1755      *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(::arg()["lua-dns-script"]));
1756      L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
1757    }
1758   
1759  }
1760  catch(std::exception &e) {
1761    L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
1762    exit(99);
1763  }
1764 
1765  t_remotes = new RemoteKeeper();
1766  t_remotes->remotes.resize(::arg().asNum("remotes-ringbuffer-entries") / g_numThreads); 
1767 
1768  if(!t_remotes->remotes.empty())
1769    memset(&t_remotes->remotes[0], 0, t_remotes->remotes.size() * sizeof(RemoteKeeper::remotes_t::value_type));
1770 
1771 
1772  MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
1773 
1774  PacketID pident;
1775
1776  t_fdm=getMultiplexer();
1777  if(!t_id) 
1778    L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
1779
1780  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
1781
1782  for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
1783    t_fdm->addReadFD(i->first, i->second);
1784 
1785  if(!t_id) {
1786   
1787    t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1788  }
1789#endif
1790 
1791  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1792 
1793  bool listenOnTCP(true);
1794
1795  counter=0; // used to periodically execute certain tasks
1796  for(;;) {
1797    while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
1798     
1799    if(!(counter%500)) {
1800      MT->makeThread(houseKeeping, 0);
1801    }
1802
1803    if(!(counter%55)) {
1804      typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
1805      expired_t expired=t_fdm->getTimeouts(g_now);
1806       
1807      for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1808        TCPConnection conn=any_cast<TCPConnection>(i->second);
1809        if(g_logCommonErrors)
1810          L<<Logger::Warning<<"Timeout from remote TCP client "<< conn.remote.toString() <<endl;
1811        t_fdm->removeReadFD(i->first);
1812        conn.closeAndCleanup();
1813      }
1814    }
1815     
1816    counter++;
1817
1818    if(!t_id && statsWanted) {
1819      doStats();
1820    }
1821
1822    Utility::gettimeofday(&g_now, 0);
1823    t_fdm->run(&g_now);
1824    // 'run' updates g_now for us
1825
1826    if(listenOnTCP) {
1827      if(TCPConnection::s_currentConnections > maxTcpClients) {  // shutdown, too many connections
1828        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1829          t_fdm->removeReadFD(*i);
1830        listenOnTCP=false;
1831      }
1832    }
1833    else {
1834      if(TCPConnection::s_currentConnections <= maxTcpClients) {  // reenable
1835        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1836          t_fdm->addReadFD(*i, handleNewTCPQuestion);
1837        listenOnTCP=true;
1838      }
1839    }
1840  }
1841}
1842catch(AhuException &ae) {
1843  L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1844  return 0;
1845}
1846catch(std::exception &e) {
1847   L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1848   return 0;
1849}
1850catch(...) {
1851   L<<Logger::Error<<"any other exception in main: "<<endl;
1852   return 0;
1853}
1854
1855#ifdef WIN32
1856void doWindowsServiceArguments(RecursorService& recursor)
1857{
1858  if(::arg().mustDo( "register-service" )) {
1859    if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1860      cerr << "Could not register service." << endl;
1861      exit( 99 );
1862    }
1863   
1864    exit( 0 );
1865  }
1866
1867  if ( ::arg().mustDo( "unregister-service" )) {
1868    recursor.unregisterService();
1869    exit( 0 );
1870  }
1871}
1872#endif
1873
1874
1875int main(int argc, char **argv) 
1876{
1877  g_stats.startupTime=time(0);
1878  reportBasicTypes();
1879
1880  int ret = EXIT_SUCCESS;
1881#ifdef WIN32
1882  RecursorService service;
1883  WSADATA wsaData;
1884  if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
1885    cerr<<"Unable to initialize winsock\n";
1886    exit(1);
1887  }
1888#endif // WIN32
1889
1890  try {
1891    ::arg().set("stack-size","stack size per mthread")="200000";
1892    ::arg().set("soa-minimum-ttl","Don't change")="0";
1893    ::arg().set("soa-serial-offset","Don't change")="0";
1894    ::arg().set("no-shuffle","Don't change")="off";
1895    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1896    ::arg().set("local-port","port to listen on")="53";
1897    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
1898    ::arg().set("trace","if we should output heaps of logging")="off";
1899    ::arg().set("daemon","Operate as a daemon")="yes";
1900    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1901    ::arg().set("chroot","switch to chroot jail")="";
1902    ::arg().set("setgid","If set, change group id to this gid for more security")="";
1903    ::arg().set("setuid","If set, change user id to this uid for more security")="";
1904    ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
1905    ::arg().set("threads", "Launch this number of threads")="2";
1906#ifdef WIN32
1907    ::arg().set("quiet","Suppress logging of questions and answers")="off";
1908    ::arg().setSwitch( "register-service", "Register the service" )= "no";
1909    ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
1910    ::arg().setSwitch( "ntservice", "Run as service" )= "no";
1911    ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes"; 
1912    ::arg().setSwitch( "use-logfile", "Use a log file" )= "no"; 
1913    ::arg().setSwitch( "logfile", "Filename of the log file" )= "recursor.log"; 
1914#else
1915    ::arg().set("quiet","Suppress logging of questions and answers")="";
1916    ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
1917#endif
1918    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1919#ifndef WIN32
1920    ::arg().set("socket-owner","Owner of socket")="";
1921    ::arg().set("socket-group","Group of socket")="";
1922    ::arg().set("socket-mode", "Permissions for socket")="";
1923#endif
1924   
1925    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1926    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1927    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1928    ::arg().set("query-local-address6","Source IPv6 address for sending queries")="";
1929    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1930    ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
1931    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1932    ::arg().set("hint-file", "If set, load root hints from this file")="";
1933    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
1934    ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
1935    ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
1936    ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
1937    ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
1938    ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
1939    ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
1940    ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
1941    ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
1942    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
1943    ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
1944    ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
1945    ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=LOCAL_NETS; 
1946    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
1947    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
1948    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
1949    ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
1950    ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
1951    ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
1952    ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
1953    ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
1954    ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
1955    ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
1956    ::arg().set("auth-can-lower-ttl", "If we follow RFC 2181 to the letter, an authoritative server can lower the TTL of NS records")="off";
1957    ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
1958    ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatability" )= "off"; 
1959    ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; 
1960    ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; 
1961    ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no"; 
1962
1963    ::arg().setCmd("help","Provide a helpful message");
1964    ::arg().setCmd("version","Print version string ("VERSION")");
1965    ::arg().setCmd("config","Output blank configuration");
1966    L.toConsole(Logger::Info);
1967    ::arg().laxParse(argc,argv); // do a lax parse
1968
1969    if(::arg().mustDo("config")) {
1970      cout<<::arg().configstring()<<endl;
1971      exit(0);
1972    }
1973
1974
1975    string configname=::arg()["config-dir"]+"/recursor.conf";
1976    cleanSlashes(configname);
1977
1978    if(!::arg().file(configname.c_str())) 
1979      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
1980
1981    ::arg().parse(argc,argv);
1982
1983    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
1984
1985    if(::arg().mustDo("help")) {
1986      cerr<<"syntax:"<<endl<<endl;
1987      cerr<<::arg().helpstring(::arg()["help"])<<endl;
1988      exit(99);
1989    }
1990    if(::arg().mustDo("version")) {
1991      cerr<<"version: "VERSION<<endl;
1992      exit(99);
1993    }
1994
1995#ifndef WIN32
1996    serviceMain(argc, argv);
1997#else
1998    doWindowsServiceArguments(service);
1999        L.toNTLog();
2000    RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" )); 
2001#endif
2002
2003  }
2004  catch(AhuException &ae) {
2005    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2006    ret=EXIT_FAILURE;
2007  }
2008  catch(std::exception &e) {
2009    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2010    ret=EXIT_FAILURE;
2011  }
2012  catch(...) {
2013    L<<Logger::Error<<"any other exception in main: "<<endl;
2014    ret=EXIT_FAILURE;
2015  }
2016 
2017#ifdef WIN32
2018  WSACleanup();
2019#endif // WIN32
2020
2021  return ret;
2022}
Note: See TracBrowser for help on using the browser.