root/trunk/pdns/pdns/pdns_recursor.cc @ 2544

Revision 2544, 66.6 KB (checked in by ahu, 14 months ago)

implement suggestion of ticket 442, export-etc-hosts-suffix

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2012  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#ifndef WIN32
20# include <netdb.h>
21# include <sys/stat.h>
22# include <unistd.h>
23#else
24 #include "ntservice.hh"
25 #include "recursorservice.hh"
26#endif // WIN32
27
28#include <boost/foreach.hpp>
29
30#include <pthread.h>
31#include "recpacketcache.hh"
32#include "utility.hh"
33#include "dns_random.hh"
34#include <iostream>
35#include <errno.h>
36#include <map>
37#include <set>
38#include "recursor_cache.hh"
39#include "cachecleaner.hh"
40#include <stdio.h>
41#include <signal.h>
42#include <stdlib.h>
43#include "misc.hh"
44#include "mtasker.hh"
45#include <utility>
46#include "arguments.hh"
47#include "syncres.hh"
48#include <fcntl.h>
49#include <fstream>
50#include "sstuff.hh"
51#include <boost/tuple/tuple.hpp>
52#include <boost/tuple/tuple_comparison.hpp>
53#include <boost/shared_array.hpp>
54#include <boost/lexical_cast.hpp>
55#include <boost/function.hpp>
56#include <boost/algorithm/string.hpp>
57#include <netinet/tcp.h>
58#include "dnsparser.hh"
59#include "dnswriter.hh"
60#include "dnsrecords.hh"
61#include "zoneparser-tng.hh"
62#include "rec_channel.hh"
63#include "logger.hh"
64#include "iputils.hh"
65#include "mplexer.hh"
66#include "config.h"
67#include "lua-pdns-recursor.hh"
68
69#ifndef RECURSOR
70#include "statbag.hh"
71StatBag S;
72#endif
73
74__thread FDMultiplexer* t_fdm;
75__thread unsigned int t_id;
76unsigned int g_maxTCPPerClient;
77unsigned int g_networkTimeoutMsec;
78bool g_logCommonErrors;
79__thread shared_ptr<PowerDNSLua>* t_pdl;
80__thread RemoteKeeper* t_remotes;
81
82RecursorControlChannel s_rcc; // only active in thread 0
83
84// for communicating with our threads
85struct ThreadPipeSet
86{
87  int writeToThread;
88  int readToThread;
89  int writeFromThread;
90  int readFromThread;
91};
92
93vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
94
95SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
96
97#include "namespaces.hh"
98
99__thread MemRecursorCache* t_RC;
100__thread RecursorPacketCache* t_packetCache;
101RecursorStats g_stats;
102bool g_quiet;
103
104bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
105
106static __thread NetmaskGroup* t_allowFrom;
107static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
108
109NetmaskGroup* g_dontQuery;
110string s_programname="pdns_recursor";
111
112typedef vector<int> tcpListenSockets_t;
113tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
114int g_tcpTimeout;
115unsigned int g_maxMThreads;
116struct timeval g_now; // timestamp, updated (too) frequently
117map<int, ComboAddress> g_listenSocketsAddresses; // is shared across all threads right now
118
119__thread MT_t* MT; // the big MTasker
120
121unsigned int g_numThreads;
122
123#define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10"
124
125//! used to send information to a newborn mthread
126struct DNSComboWriter {
127  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), 
128                                                                                                        d_tcp(false), d_socket(-1)
129  {}
130  MOADNSParser d_mdp;
131  void setRemote(const ComboAddress* sa)
132  {
133    d_remote=*sa;
134  }
135
136  void setSocket(int sock)
137  {
138    d_socket=sock;
139  }
140
141  string getRemote() const
142  {
143    return d_remote.toString();
144  }
145
146  struct timeval d_now;
147  ComboAddress d_remote;
148  bool d_tcp;
149  int d_socket;
150  shared_ptr<TCPConnection> d_tcpConnection;
151};
152
153
154ArgvMap &arg()
155{
156  static ArgvMap theArg;
157  return theArg;
158}
159
160
161void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
162
163// -1 is error, 0 is timeout, 1 is success
164int asendtcp(const string& data, Socket* sock) 
165{
166  PacketID pident;
167  pident.sock=sock;
168  pident.outMSG=data;
169 
170  t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
171  string packet;
172
173  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
174
175  if(!ret || ret==-1) { // timeout
176    t_fdm->removeWriteFD(sock->getHandle());
177  }
178  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
179    return -1;
180  }
181  return ret;
182}
183
184void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
185
186// -1 is error, 0 is timeout, 1 is success
187int arecvtcp(string& data, int len, Socket* sock) 
188{
189  data.clear();
190  PacketID pident;
191  pident.sock=sock;
192  pident.inNeeded=len;
193  t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
194
195  int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
196  if(!ret || ret==-1) { // timeout
197    t_fdm->removeReadFD(sock->getHandle());
198  }
199  else if(data.empty()) {// error, EOF or other
200    return -1;
201  }
202
203  return ret;
204}
205
206vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6; 
207const ComboAddress g_local4("0.0.0.0"), g_local6("::");
208
209//! pick a random query local address
210ComboAddress getQueryLocalAddress(int family, uint16_t port)
211{
212  ComboAddress ret;
213  if(family==AF_INET) {
214    if(g_localQueryAddresses4.empty()) 
215      ret = g_local4;
216    else 
217      ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
218    ret.sin4.sin_port = htons(port);
219  }
220  else {
221    if(g_localQueryAddresses6.empty())
222      ret = g_local6;
223    else
224      ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
225     
226    ret.sin6.sin6_port = htons(port);
227  }
228  return ret;
229}
230
231void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
232
233void setSocketBuffer(int fd, int optname, uint32_t size)
234{
235  uint32_t psize=0;
236  socklen_t len=sizeof(psize);
237 
238  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
239    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
240    return; 
241  }
242
243  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
244    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
245}
246
247
248static void setSocketReceiveBuffer(int fd, uint32_t size)
249{
250  setSocketBuffer(fd, SO_RCVBUF, size);
251}
252
253static void setSocketSendBuffer(int fd, uint32_t size)
254{
255  setSocketBuffer(fd, SO_SNDBUF, size);
256}
257
258
259// you can ask this class for a UDP socket to send a query from
260// this socket is not yours, don't even think about deleting it
261// but after you call 'returnSocket' on it, don't assume anything anymore
262class UDPClientSocks
263{
264  unsigned int d_numsocks;
265  unsigned int d_maxsocks;
266public:
267  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
268  {
269  }
270
271  typedef set<int> socks_t;
272  socks_t d_socks;
273
274  // returning -1 means: temporary OS error (ie, out of files), -2 means OS error
275  int getSocket(const ComboAddress& toaddr, int* fd)
276  {
277    *fd=makeClientSocket(toaddr.sin4.sin_family);
278    if(*fd < 0) // temporary error - receive exception otherwise
279      return -1;
280
281    if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
282      int err = errno;
283      //      returnSocket(*fd);
284      Utility::closesocket(*fd);
285      if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
286        return -2;
287      return -1;
288    }
289
290    d_socks.insert(*fd);
291    d_numsocks++;
292    return 0;
293  }
294
295  void returnSocket(int fd)
296  {
297    socks_t::iterator i=d_socks.find(fd);
298    if(i==d_socks.end()) {
299      throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
300    }
301    returnSocketLocked(i);
302  }
303
304  // return a socket to the pool, or simply erase it
305  void returnSocketLocked(socks_t::iterator& i)
306  {
307    if(i==d_socks.end()) {
308      throw AhuException("Trying to return a socket not in the pool");
309    }
310    try {
311      t_fdm->removeReadFD(*i);
312    }
313    catch(FDMultiplexerException& e) {
314      // we sometimes return a socket that has not yet been assigned to t_fdm
315    }
316    Utility::closesocket(*i);
317   
318    d_socks.erase(i++);
319    --d_numsocks;
320  }
321
322  // returns -1 for errors which might go away, throws for ones that won't
323  static int makeClientSocket(int family)
324  {
325    int ret=(int)socket(family, SOCK_DGRAM, 0);
326    Utility::setCloseOnExec(ret);
327
328    if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
329      return ret;
330   
331    if(ret<0) 
332      throw AhuException("Making a socket for resolver: "+stringerror());
333
334   
335    int tries=10;
336    while(--tries) {
337      uint16_t port;
338     
339      if(tries==1)  // fall back to kernel 'random'
340        port = 0;
341      else
342        port = 1025 + dns_random(64510);
343     
344      ComboAddress sin=getQueryLocalAddress(family, port); // does htons for us
345
346      if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0) 
347        break;
348    }
349    if(!tries)
350      throw AhuException("Resolver binding to local query client socket: "+stringerror());
351   
352    Utility::setNonBlocking(ret);
353    return ret;
354  }
355};
356
357static __thread UDPClientSocks* t_udpclientsocks;
358
359/* these two functions are used by LWRes */
360// -2 is OS error, -1 is error that depends on the remote, > 0 is success
361int asendto(const char *data, int len, int flags, 
362            const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd) 
363{
364
365  PacketID pident;
366  pident.domain = domain;
367  pident.remote = toaddr;
368  pident.type = qtype;
369
370  // see if there is an existing outstanding request we can chain on to, using partial equivalence function
371  pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
372
373  for(; chain.first != chain.second; chain.first++) {
374    if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
375      /*
376      cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
377      cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
378          <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
379      */
380      chain.first->key.chain.insert(id); // we can chain
381      *fd=-1;                            // gets used in waitEvent / sendEvent later on
382      return 1;
383    }
384  }
385
386  int ret=t_udpclientsocks->getSocket(toaddr, fd);
387  if(ret < 0)
388    return ret;
389
390  pident.fd=*fd;
391  pident.id=id;
392 
393  t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
394  ret = send(*fd, data, len, 0);
395
396  int tmp = errno;
397
398  if(ret < 0)
399    t_udpclientsocks->returnSocket(*fd);
400
401  errno = tmp; // this is for logging purposes only
402  return ret;
403}
404
405// -1 is error, 0 is timeout, 1 is success
406int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len, 
407              uint16_t id, const string& domain, uint16_t qtype, int fd, struct timeval* now)
408{
409  static optional<unsigned int> nearMissLimit;
410  if(!nearMissLimit) 
411    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
412
413  PacketID pident;
414  pident.fd=fd;
415  pident.id=id;
416  pident.domain=domain;
417  pident.type = qtype;
418  pident.remote=fromaddr;
419
420  string packet;
421  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
422
423  if(ret > 0) {
424    if(packet.empty()) // means "error"
425      return -1; 
426
427    *d_len=(int)packet.size();
428    memcpy(data,packet.c_str(),min(len,*d_len));
429    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
430      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
431      g_stats.spoofCount++;
432      return -1;
433    }
434  }
435  else {
436    if(fd >= 0)
437      t_udpclientsocks->returnSocket(fd);
438  }
439  return ret;
440}
441
442
443string s_pidfname;
444static void writePid(void)
445{
446  ofstream of(s_pidfname.c_str(), std::ios_base::app);
447  if(of)
448    of<< Utility::getpid() <<endl;
449  else
450    L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
451}
452
453typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
454tcpClientCounts_t __thread* t_tcpClientCounts;
455
456TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
457{ 
458  ++s_currentConnections; 
459  (*t_tcpClientCounts)[d_remote]++;
460}
461
462TCPConnection::~TCPConnection()
463{
464  if(Utility::closesocket(d_fd) < 0) 
465    unixDie("closing socket for TCPConnection");
466  if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--) 
467    t_tcpClientCounts->erase(d_remote);
468  --s_currentConnections;
469}
470
471AtomicCounter TCPConnection::s_currentConnections; 
472void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
473
474void updateRcodeStats(int res)
475{
476  switch(res) {
477  case RCode::ServFail:
478    g_stats.servFails++;
479    break;
480  case RCode::NXDomain:
481    g_stats.nxDomains++;
482    break;
483  case RCode::NoError:
484    g_stats.noErrors++;
485    break;
486  }
487}
488
489void startDoResolve(void *p)
490{
491  DNSComboWriter* dc=(DNSComboWriter *)p;
492
493  try {
494    uint32_t maxanswersize= dc->d_tcp ? 65535 : 512;
495    EDNSOpts edo;
496    if(getEDNSOpts(dc->d_mdp, &edo)) {
497      maxanswersize = min(edo.d_packetsize, (uint16_t) (dc->d_tcp ? 65535 : 1680));
498    }
499   
500    vector<DNSResourceRecord> ret;
501    vector<uint8_t> packet;
502
503    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); 
504
505    pw.getHeader()->aa=0;
506    pw.getHeader()->ra=1;
507    pw.getHeader()->qr=1;
508    pw.getHeader()->tc=0;
509    pw.getHeader()->id=dc->d_mdp.d_header.id;
510    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
511
512    SyncRes sr(dc->d_now);
513    if(!g_quiet)
514      L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
515       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
516
517    sr.setId(MT->getTid());
518    if(!dc->d_mdp.d_header.rd)
519      sr.setCacheOnly();
520
521    int res;
522
523    bool variableAnswer = false;
524    // if there is a PowerDNSLua active, and it 'took' the query in preResolve, we don't launch beginResolve
525    if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer)) {
526       res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
527
528      if(t_pdl->get()) {
529        if(res == RCode::NoError) {
530          vector<DNSResourceRecord>::const_iterator i;
531          for(i=ret.begin(); i!=ret.end(); ++i) 
532              if(i->qtype.getCode() == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
533                        break;
534          if(i == ret.end())
535            (*t_pdl)->nodata(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
536        }
537        else if(res == RCode::NXDomain)
538          (*t_pdl)->nxdomain(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
539     
540      (*t_pdl)->postresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
541      }
542    }
543
544
545   
546    uint32_t minTTL=std::numeric_limits<uint32_t>::max();
547    if(res < 0) {
548      pw.getHeader()->rcode=RCode::ServFail;
549      // no commit here, because no record
550      g_stats.servFails++;
551    }
552    else {
553      pw.getHeader()->rcode=res;
554      updateRcodeStats(res);
555   
556      if(ret.size()) {
557        shuffle(ret);
558       
559        for(vector<DNSResourceRecord>::const_iterator i=ret.begin(); i!=ret.end(); ++i) {
560          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place); 
561          minTTL = min(minTTL, i->ttl);
562          if(i->qtype.getCode() == QType::A) { // blast out A record w/o doing whole dnswriter thing
563            uint32_t ip=0;
564            IpToU32(i->content, &ip);
565            pw.xfr32BitInt(htonl(ip));
566          } else {
567            shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content)); 
568            drc->toPacket(pw);
569          }
570          if(pw.size() > maxanswersize) {
571            pw.rollback();
572            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
573              pw.getHeader()->tc=1;
574            goto sendit; // need to jump over pw.commit
575          }
576        }
577
578      pw.commit();
579      }
580    }
581  sendit:;
582    if(!dc->d_tcp) {
583      sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
584      if(!SyncRes::s_nopacketcache && !variableAnswer ) {
585        t_packetCache->insertResponsePacket(string((const char*)&*packet.begin(), packet.size()), g_now.tv_sec, 
586                                           min(minTTL, 
587                                               (pw.getHeader()->rcode == RCode::ServFail) ? SyncRes::s_packetcacheservfailttl : SyncRes::s_packetcachettl
588                                               ) 
589                                          );
590      }
591    }
592    else {
593      char buf[2];
594      buf[0]=packet.size()/256;
595      buf[1]=packet.size()%256;
596
597      Utility::iovec iov[2];
598
599      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
600      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
601
602      int ret=Utility::writev(dc->d_socket, iov, 2);
603      bool hadError=true;
604
605      if(ret == 0) 
606        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
607      else if(ret < 0 ) 
608        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
609      else if((unsigned int)ret != 2 + packet.size())
610        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
611      else
612        hadError=false;
613     
614      // update tcp connection status, either by closing or moving to 'BYTE0'
615   
616      if(hadError) {
617        // no need to remove us from FDM, we weren't there
618        dc->d_socket = -1;
619      }
620      else {
621        dc->d_tcpConnection->state=TCPConnection::BYTE0;
622        Utility::gettimeofday(&g_now, 0); // needs to be updated
623        t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
624        t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
625      }
626    }
627   
628    if(!g_quiet) {
629      L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
630      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
631      sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
632    }
633
634    sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++; 
635    float spent=makeFloat(sr.d_now-dc->d_now);
636    if(spent < 0.001)
637      g_stats.answers0_1++;
638    else if(spent < 0.010)
639      g_stats.answers1_10++;
640    else if(spent < 0.1)
641      g_stats.answers10_100++;
642    else if(spent < 1.0)
643      g_stats.answers100_1000++;
644    else
645      g_stats.answersSlow++;
646
647    uint64_t newLat=(uint64_t)(spent*1000000);
648    if(newLat < 1000000)  // outliers of several minutes exist..
649      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
650
651    delete dc;
652    dc=0;
653  }
654  catch(AhuException &ae) {
655    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
656    delete dc;
657  }
658  catch(MOADNSException& e) {
659    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
660    delete dc;
661  }
662  catch(std::exception& e) {
663    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
664    delete dc;
665  }
666  catch(...) {
667    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
668  }
669 
670  g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
671}
672
673void makeControlChannelSocket(int processNum=-1)
674{
675  string sockname=::arg()["socket-dir"]+"/pdns_recursor";
676  if(processNum >= 0)
677    sockname += "."+lexical_cast<string>(processNum);
678  sockname+=".controlsocket";
679  s_rcc.listen(sockname);
680 
681#ifndef WIN32
682  int sockowner = -1;
683  int sockgroup = -1;
684
685  if (!::arg().isEmpty("socket-group"))
686    sockgroup=::arg().asGid("socket-group");
687  if (!::arg().isEmpty("socket-owner"))
688    sockowner=::arg().asUid("socket-owner");
689 
690  if (sockgroup > -1 || sockowner > -1) {
691    if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
692      unixDie("Failed to chown control socket");
693    }
694  }
695
696  // do mode change if socket-mode is given
697  if(!::arg().isEmpty("socket-mode")) {
698    mode_t sockmode=::arg().asMode("socket-mode");
699    chmod(sockname.c_str(), sockmode);
700  }
701#endif
702}
703
704void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
705{
706  shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
707
708  if(conn->state==TCPConnection::BYTE0) {
709    int bytes=recv(conn->getFD(), conn->data, 2, 0);
710    if(bytes==1)
711      conn->state=TCPConnection::BYTE1;
712    if(bytes==2) { 
713      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
714      conn->bytesread=0;
715      conn->state=TCPConnection::GETQUESTION;
716    }
717    if(!bytes || bytes < 0) {
718      t_fdm->removeReadFD(fd);
719      return;
720    }
721  }
722  else if(conn->state==TCPConnection::BYTE1) {
723    int bytes=recv(conn->getFD(), conn->data+1, 1, 0);
724    if(bytes==1) {
725      conn->state=TCPConnection::GETQUESTION;
726      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
727      conn->bytesread=0;
728    }
729    if(!bytes || bytes < 0) {
730      if(g_logCommonErrors)
731        L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
732      t_fdm->removeReadFD(fd);
733      return;
734    }
735  }
736  else if(conn->state==TCPConnection::GETQUESTION) {
737    int bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
738    if(!bytes || bytes < 0) {
739      L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
740      t_fdm->removeReadFD(fd);
741      return;
742    }
743    conn->bytesread+=bytes;
744    if(conn->bytesread==conn->qlen) {
745      t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
746
747      DNSComboWriter* dc=0;
748      try {
749        dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
750      }
751      catch(MOADNSException &mde) {
752        g_stats.clientParseError++; 
753        if(g_logCommonErrors)
754          L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
755        return;
756      }
757      dc->d_tcpConnection = conn; // carry the torch
758      dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
759      dc->d_tcp=true;
760      dc->setRemote(&conn->d_remote);
761      if(dc->d_mdp.d_header.qr) {
762        delete dc;
763        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
764        return;
765      }
766      else {
767        ++g_stats.qcounter;
768        ++g_stats.tcpqcounter;
769        MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
770        return;
771      }
772    }
773  }
774}
775
776//! Handle new incoming TCP connection
777void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
778{
779  ComboAddress addr;
780  socklen_t addrlen=sizeof(addr);
781  int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
782  if(newsock>0) {
783    if(MT->numProcesses() > g_maxMThreads) {
784      g_stats.overCapacityDrops++;
785      Utility::closesocket(newsock);
786      return;
787    }
788
789    t_remotes->addRemote(addr);
790    if(t_allowFrom && !t_allowFrom->match(&addr)) {
791      if(!g_quiet) 
792        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
793
794      g_stats.unauthorizedTCP++;
795      Utility::closesocket(newsock);
796      return;
797    }
798    if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
799      g_stats.tcpClientOverflow++;
800      Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
801      return;
802    }
803   
804    Utility::setNonBlocking(newsock);
805    shared_ptr<TCPConnection> tc(new TCPConnection(newsock, addr));
806    tc->state=TCPConnection::BYTE0;
807   
808    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
809
810    struct timeval now;
811    Utility::gettimeofday(&now, 0);
812    t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
813  }
814}
815 
816string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
817{
818  ++g_stats.qcounter;
819
820  string response;
821  try {
822    uint32_t age;
823    if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
824      if(!g_quiet)
825        L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
826
827      g_stats.packetCacheHits++;
828      SyncRes::s_queries++;
829      ageDNSPacket(response, age);
830      sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
831      if(response.length() >= sizeof(struct dnsheader)) {
832        struct dnsheader dh;
833        memcpy(&dh, response.c_str(), sizeof(dh));
834        updateRcodeStats(dh.rcode);
835      }
836      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
837      return 0;
838    }
839  } 
840  catch(std::exception& e) {
841    L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
842    return 0;
843  }
844 
845 
846  if(MT->numProcesses() > g_maxMThreads) {
847    g_stats.overCapacityDrops++;
848    return 0;
849  }
850 
851  DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
852  dc->setSocket(fd);
853  dc->setRemote(&fromaddr);
854
855  dc->d_tcp=false;
856  MT->makeThread(startDoResolve, (void*) dc); // deletes dc
857  return 0;
858} 
859 
860void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
861{
862  int len;
863  char data[1500];
864  ComboAddress fromaddr;
865  socklen_t addrlen=sizeof(fromaddr);
866 
867  if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
868    t_remotes->addRemote(fromaddr);
869
870    if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
871      if(!g_quiet) 
872        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
873
874      g_stats.unauthorizedUDP++;
875      return;
876    }
877    try {
878      dnsheader* dh=(dnsheader*)data;
879     
880      if(dh->qr) {
881        if(g_logCommonErrors)
882          L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
883      }
884      else {
885        string question(data, len);
886        if(g_weDistributeQueries)
887          distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
888        else
889          doProcessUDPQuestion(question, fromaddr, fd);
890      }
891    }
892    catch(MOADNSException& mde) {
893      g_stats.clientParseError++; 
894      if(g_logCommonErrors)
895        L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
896    }
897  }
898  else {
899    // cerr<<t_id<<" had error: "<<stringerror()<<endl;
900    if(errno == EAGAIN)
901      g_stats.noPacketError++;
902  }
903}
904
905
906typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
907deferredAdd_t deferredAdd;
908
909void makeTCPServerSockets()
910{
911  int fd;
912  vector<string>locals;
913  stringtok(locals,::arg()["local-address"]," ,");
914
915  if(locals.empty())
916    throw AhuException("No local address specified");
917 
918  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
919    ServiceTuple st;
920    st.port=::arg().asNum("local-port");
921    parseService(*i, st);
922   
923    ComboAddress sin;
924
925    memset((char *)&sin,0, sizeof(sin));
926    sin.sin4.sin_family = AF_INET;
927    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
928      sin.sin6.sin6_family = AF_INET6;
929      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
930        throw AhuException("Unable to resolve local address for TCP server on '"+ st.host +"'"); 
931    }
932
933    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
934    Utility::setCloseOnExec(fd);
935
936    if(fd<0) 
937      throw AhuException("Making a TCP server socket for resolver: "+stringerror());
938
939    int tmp=1;
940    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
941      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
942      exit(1);
943    }
944   
945#ifdef TCP_DEFER_ACCEPT
946    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
947      if(i==locals.begin())
948        L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
949    }
950#endif
951
952    sin.sin4.sin_port = htons(st.port);
953    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
954    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0) 
955      throw AhuException("Binding TCP server socket for "+ st.host +": "+stringerror());
956   
957    Utility::setNonBlocking(fd);
958    setSocketSendBuffer(fd, 65000);
959    listen(fd, 128);
960    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
961    g_tcpListenSockets.push_back(fd);
962
963    if(sin.sin4.sin_family == AF_INET) 
964      L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
965    else
966      L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
967  }
968}
969
970
971
972void makeUDPServerSockets()
973{
974  vector<string>locals;
975  stringtok(locals,::arg()["local-address"]," ,");
976
977  if(locals.empty())
978    throw AhuException("No local address specified");
979 
980  if(::arg()["local-address"]=="0.0.0.0") {
981    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
982  }
983
984  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
985    ServiceTuple st;
986    st.port=::arg().asNum("local-port");
987    parseService(*i, st);
988
989    ComboAddress sin;
990
991    memset(&sin, 0, sizeof(sin));
992    sin.sin4.sin_family = AF_INET;
993    if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
994      sin.sin6.sin6_family = AF_INET6;
995      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
996        throw AhuException("Unable to resolve local address for UDP server on '"+ st.host +"'"); 
997    }
998   
999    int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
1000    Utility::setCloseOnExec(fd);
1001
1002    if(fd < 0) {
1003      throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
1004    }
1005
1006    setSocketReceiveBuffer(fd, 200000);
1007    sin.sin4.sin_port = htons(st.port);
1008
1009    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1010    if (::bind(fd, (struct sockaddr *)&sin, socklen)<0) 
1011      throw AhuException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
1012   
1013    Utility::setNonBlocking(fd);
1014
1015    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
1016    g_listenSocketsAddresses[fd]=sin;  // this is written to only from the startup thread, not from the workers
1017    if(sin.sin4.sin_family == AF_INET) 
1018      L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
1019    else
1020      L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1021  }
1022}
1023
1024
1025#ifndef WIN32
1026void daemonize(void)
1027{
1028  if(fork())
1029    exit(0); // bye bye
1030 
1031  setsid(); 
1032
1033  int i=open("/dev/null",O_RDWR); /* open stdin */
1034  if(i < 0) 
1035    L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1036  else {
1037    dup2(i,0); /* stdin */
1038    dup2(i,1); /* stderr */
1039    dup2(i,2); /* stderr */
1040    close(i);
1041  }
1042}
1043#endif
1044
1045uint64_t counter;
1046bool statsWanted;
1047
1048void usr1Handler(int)
1049{
1050  statsWanted=true;
1051}
1052
1053void usr2Handler(int)
1054{
1055  SyncRes::setLog(true);
1056  g_quiet=false;
1057  ::arg().set("quiet")="no";
1058
1059}
1060
1061void doStats(void)
1062{
1063  static time_t lastOutputTime;
1064  static uint64_t lastQueryCount;
1065 
1066  if(g_stats.qcounter && (t_RC->cacheHits + t_RC->cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {  // this only runs once thread 0 has had hits
1067    uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1068    uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1069   
1070    L<<Logger::Warning<<"stats: "<<g_stats.qcounter<<" questions, "<<
1071      broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1072      broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1073      (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl; 
1074   
1075    L<<Logger::Warning<<"stats: throttle map: "
1076      << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1077      << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl; 
1078    L<<Logger::Warning<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1079    L<<Logger::Warning<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1080     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1081    L<<Logger::Warning<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1082      broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1083
1084    //L<<Logger::Warning<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1085      //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1086   
1087    L<<Logger::Warning<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1088    " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1089   
1090    time_t now = time(0);
1091    if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1092      L<<Logger::Warning<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1093    }
1094    lastOutputTime = now;
1095    lastQueryCount = SyncRes::s_queries;
1096  }
1097  else if(statsWanted) 
1098    L<<Logger::Warning<<"stats: no stats yet!"<<endl;
1099
1100  statsWanted=false;
1101}
1102
1103static void houseKeeping(void *)
1104try
1105{
1106  static __thread time_t last_stat, last_rootupdate, last_prune;
1107  static __thread int cleanCounter=0;
1108  struct timeval now;
1109  Utility::gettimeofday(&now, 0);
1110
1111  // clog<<"* "<<t_id<<" "<<(void*)&last_stat<<"\t"<<(unsigned int)last_stat<<endl;
1112
1113  if(now.tv_sec - last_prune > (time_t)(5 + t_id)) { 
1114    DTime dt;
1115    dt.setTimeval(now);
1116    t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1117    t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numThreads);
1118   
1119    pruneCollection(t_sstorage->negcache, ::arg().asNum("max-cache-entries") / (g_numThreads * 10), 200);
1120   
1121    if(!((cleanCounter++)%40)) {  // this is a full scan!
1122      time_t limit=now.tv_sec-300;
1123      for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1124        if(i->second.stale(limit))
1125          t_sstorage->nsSpeeds.erase(i++);
1126        else
1127          ++i;
1128    }
1129//    L<<Logger::Warning<<"Spent "<<dt.udiff()/1000<<" msec cleaning"<<endl;
1130    last_prune=time(0);
1131  }
1132 
1133  if(!t_id) {
1134    if(now.tv_sec - last_stat > 1800) { 
1135      doStats();
1136      last_stat=time(0);
1137    }
1138  }
1139 
1140  if(now.tv_sec - last_rootupdate > 7200) {
1141    SyncRes sr(now);
1142    sr.setDoEDNS0(true);
1143    vector<DNSResourceRecord> ret;
1144
1145    sr.setNoCache();
1146    int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
1147    if(!res) {
1148      L<<Logger::Warning<<"Refreshed . records"<<endl;
1149      last_rootupdate=now.tv_sec;
1150    }
1151    else
1152      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1153  }
1154}
1155catch(AhuException& ae)
1156{
1157  L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1158  throw;
1159}
1160;
1161
1162void makeThreadPipes()
1163{
1164  for(unsigned int n=0; n < g_numThreads; ++n) {
1165    struct ThreadPipeSet tps;
1166    int fd[2];
1167    if(pipe(fd) < 0)
1168      unixDie("Creating pipe for inter-thread communications");
1169   
1170    tps.readToThread = fd[0];
1171    tps.writeToThread = fd[1];
1172   
1173    if(pipe(fd) < 0)
1174      unixDie("Creating pipe for inter-thread communications");
1175    tps.readFromThread = fd[0];
1176    tps.writeFromThread = fd[1];
1177   
1178    g_pipes.push_back(tps);
1179  }
1180}
1181
1182struct ThreadMSG
1183{
1184  pipefunc_t func;
1185  bool wantAnswer;
1186};
1187
1188void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1189{
1190  unsigned int n = 0;
1191  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1192  {
1193    if(n++ == t_id) {
1194      if(!skipSelf)
1195        func(); // don't write to ourselves!
1196      continue;
1197    }
1198 
1199    ThreadMSG* tmsg = new ThreadMSG();
1200    tmsg->func = func;
1201    tmsg->wantAnswer = true;
1202    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1203      unixDie("write to thread pipe returned wrong size or error");
1204   
1205    string* resp;
1206    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1207      unixDie("read from thread pipe returned wrong size or error");
1208   
1209    if(resp) {
1210//      cerr <<"got response: " << *resp << endl;
1211      delete resp;
1212    }
1213  }
1214}
1215void distributeAsyncFunction(const pipefunc_t& func)
1216{
1217  static unsigned int counter;
1218  unsigned int target = 1 + (++counter % (g_pipes.size()-1));
1219  // cerr<<"Sending to: "<<target<<endl;
1220  if(target == t_id) {
1221    func();
1222    return;
1223  }
1224  ThreadPipeSet& tps = g_pipes[target];   
1225  ThreadMSG* tmsg = new ThreadMSG();
1226  tmsg->func = func;
1227  tmsg->wantAnswer = false;
1228 
1229  if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1230    unixDie("write to thread pipe returned wrong size or error");
1231   
1232}
1233
1234void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
1235{
1236  ThreadMSG* tmsg;
1237 
1238  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
1239    unixDie("read from thread pipe returned wrong size or error");
1240  }
1241 
1242  void *resp = tmsg->func();
1243  if(tmsg->wantAnswer)
1244    if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
1245      unixDie("write to thread pipe returned wrong size or error");
1246 
1247  delete tmsg;
1248}
1249
1250template<class T> void *voider(const boost::function<T*()>& func)
1251{
1252  return func();
1253}
1254
1255vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
1256{
1257  a.insert(a.end(), b.begin(), b.end());
1258  return a;
1259}
1260
1261template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
1262{
1263  unsigned int n = 0;
1264  T ret=T();
1265  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1266  {
1267    if(n++ == t_id) {
1268      if(!skipSelf) {
1269        T* resp = (T*)func(); // don't write to ourselves!
1270        if(resp) {
1271          //~ cerr <<"got direct: " << *resp << endl;
1272          ret += *resp;
1273          delete resp;
1274        }
1275      }
1276      continue;
1277    }
1278     
1279    ThreadMSG* tmsg = new ThreadMSG();
1280    tmsg->func = boost::bind(voider<T>, func);
1281    tmsg->wantAnswer = true;
1282 
1283    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1284      unixDie("write to thread pipe returned wrong size or error");
1285 
1286   
1287    T* resp;
1288    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1289      unixDie("read from thread pipe returned wrong size or error");
1290   
1291    if(resp) {
1292      //~ cerr <<"got response: " << *resp << endl;
1293      ret += *resp;
1294      delete resp;
1295    }
1296  }
1297  return ret;
1298}
1299
1300template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
1301template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
1302template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
1303
1304void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
1305{
1306  string remote;
1307  string msg=s_rcc.recv(&remote);
1308  RecursorControlParser rcp;
1309  RecursorControlParser::func_t* command;
1310  string answer=rcp.getAnswer(msg, &command);
1311  try {
1312    s_rcc.send(answer, &remote);
1313    command();
1314  }
1315  catch(std::exception& e) {
1316    L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1317  }
1318  catch(AhuException& ae) {
1319    L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1320  }
1321}
1322
1323void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
1324{
1325  PacketID* pident=any_cast<PacketID>(&var);
1326  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1327
1328  shared_array<char> buffer(new char[pident->inNeeded]);
1329
1330  int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1331  if(ret > 0) {
1332    pident->inMSG.append(&buffer[0], &buffer[ret]);
1333    pident->inNeeded-=ret;
1334    if(!pident->inNeeded) {
1335      //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1336      PacketID pid=*pident;
1337      string msg=pident->inMSG;
1338     
1339      t_fdm->removeReadFD(fd);
1340      MT->sendEvent(pid, &msg); 
1341    }
1342    else {
1343      //      cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1344    }
1345  }
1346  else {
1347    PacketID tmp=*pident;
1348    t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1349    string empty;
1350    MT->sendEvent(tmp, &empty); // this conveys error status
1351  }
1352}
1353
1354void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
1355{
1356  PacketID* pid=any_cast<PacketID>(&var);
1357  int ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
1358  if(ret > 0) {
1359    pid->outPos+=ret;
1360    if(pid->outPos==pid->outMSG.size()) {
1361      PacketID tmp=*pid;
1362      t_fdm->removeWriteFD(fd);
1363      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1364    }
1365  }
1366  else {  // error or EOF
1367    PacketID tmp(*pid);
1368    t_fdm->removeWriteFD(fd);
1369    string sent;
1370    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1371  }
1372}
1373
1374// resend event to everybody chained onto it
1375void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1376{
1377  if(iter->key.chain.empty())
1378    return;
1379  //  cerr<<"doResends called!\n";
1380  for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1381    resend.fd=-1;
1382    resend.id=*i;
1383    //    cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
1384
1385    MT->sendEvent(resend, &content);
1386    g_stats.chainResends++;
1387  }
1388}
1389
1390void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
1391{
1392  PacketID pid=any_cast<PacketID>(var);
1393  int len;
1394  char data[1500];
1395  ComboAddress fromaddr;
1396  socklen_t addrlen=sizeof(fromaddr);
1397
1398  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1399
1400  if(len < (int)sizeof(dnsheader)) {
1401    if(len < 0)
1402      ; //      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1403    else {
1404      g_stats.serverParseError++; 
1405      if(g_logCommonErrors)
1406        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
1407          ": packet smalller than DNS header"<<endl;
1408    }
1409
1410    t_udpclientsocks->returnSocket(fd);
1411    string empty;
1412
1413    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1414    if(iter != MT->d_waiters.end()) 
1415      doResends(iter, pid, empty);
1416   
1417    MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1418    return;
1419  } 
1420
1421  dnsheader dh;
1422  memcpy(&dh, data, sizeof(dh));
1423 
1424  if(dh.qr) {
1425    PacketID pident;
1426    pident.remote=fromaddr;
1427    pident.id=dh.id;
1428    pident.fd=fd;
1429    if(!dh.qdcount) { // UPC, Nominum, very old BIND on FormErr, NSD
1430      pident.domain.clear();
1431      pident.type = 0;
1432    }
1433    else {
1434      try {
1435        pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1436      }
1437      catch(std::exception& e) {
1438        g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
1439        L<<Logger::Warning<<"Error in packet from "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
1440        return;
1441      }
1442    }
1443    string packet;
1444    packet.assign(data, len);
1445
1446    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1447    if(iter != MT->d_waiters.end()) {
1448      doResends(iter, pident, packet);
1449    }
1450
1451  retryWithName:
1452
1453    if(!MT->sendEvent(pident, &packet)) {
1454      // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
1455      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1456        if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
1457           pdns_iequals(pident.domain, mthread->key.domain)) {
1458          mthread->key.nearMisses++;
1459        }
1460
1461        // be a bit paranoid here since we're weakening our matching
1462        if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type && 
1463           pident.id  == mthread->key.id && mthread->key.remote == pident.remote) {
1464          // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
1465          pident.domain = mthread->key.domain;
1466          pident.type = mthread->key.type;
1467          goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
1468        }
1469      }
1470      g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
1471      if(g_logCommonErrors) {
1472        L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<<pident.domain<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
1473      }
1474    }
1475    else if(fd >= 0) {
1476      t_udpclientsocks->returnSocket(fd);
1477    }
1478  }
1479  else
1480    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< fromaddr.toStringWithPort()  <<endl;
1481}
1482
1483FDMultiplexer* getMultiplexer()
1484{
1485  FDMultiplexer* ret;
1486  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1487      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1488    try {
1489      ret=i->second();
1490      return ret;
1491    }
1492    catch(FDMultiplexerException &fe) {
1493      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1494    }
1495    catch(...) {
1496      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1497    }
1498  }
1499  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1500  exit(1);
1501}
1502
1503 
1504void* doReloadLuaScript()
1505{
1506  string fname= ::arg()["lua-dns-script"];
1507  try {
1508    if(fname.empty()) {
1509      t_pdl->reset();
1510      L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
1511    }
1512    else {
1513      *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(fname));
1514    }
1515  }
1516  catch(std::exception& e) {
1517    L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
1518  }
1519   
1520  L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
1521  return 0;
1522}
1523
1524string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1525{
1526  if(begin != end) 
1527    ::arg().set("lua-dns-script") = *begin;
1528 
1529  broadcastFunction(doReloadLuaScript);
1530 
1531  return "ok, reload/unload queued\n";
1532} 
1533
1534void* recursorThread(void*);
1535
1536void* pleaseSupplantACLs(NetmaskGroup *ng)
1537{
1538  t_allowFrom = ng;
1539  return 0;
1540}
1541
1542void parseACLs()
1543{
1544  static bool l_initialized;
1545 
1546  if(l_initialized) { // only reload configuration file on second call
1547    string configname=::arg()["config-dir"]+"/recursor.conf";
1548    cleanSlashes(configname);
1549   
1550    if(!::arg().preParseFile(configname.c_str(), "allow-from-file")) 
1551      L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1552   
1553    ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
1554  }
1555
1556  NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
1557 
1558  if(!::arg()["allow-from-file"].empty()) {
1559    string line;
1560    ifstream ifs(::arg()["allow-from-file"].c_str());
1561    if(!ifs) {
1562      delete allowFrom; 
1563      throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
1564    }
1565
1566    string::size_type pos;
1567    while(getline(ifs,line)) {
1568      pos=line.find('#');
1569      if(pos!=string::npos)
1570        line.resize(pos);
1571      trim(line);
1572      if(line.empty())
1573        continue;
1574
1575      allowFrom->addMask(line);
1576    }
1577    L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
1578  }
1579  else if(!::arg()["allow-from"].empty()) {
1580    vector<string> ips;
1581    stringtok(ips, ::arg()["allow-from"], ", ");
1582   
1583    L<<Logger::Warning<<"Only allowing queries from: ";
1584    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1585      allowFrom->addMask(*i);
1586      if(i!=ips.begin())
1587        L<<Logger::Warning<<", ";
1588      L<<Logger::Warning<<*i;
1589    }
1590    L<<Logger::Warning<<endl;
1591  }
1592  else {
1593    if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53) 
1594      L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1595    delete allowFrom;
1596    allowFrom = 0;
1597  }
1598 
1599  g_initialAllowFrom = allowFrom;
1600  broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
1601  delete oldAllowFrom;
1602 
1603  l_initialized = true;
1604}
1605
1606int serviceMain(int argc, char*argv[])
1607{
1608  L.setName("pdns_recursor");
1609
1610  L.setLoglevel((Logger::Urgency)(6)); // info and up
1611
1612  if(!::arg()["logging-facility"].empty()) {
1613    int val=logFacilityToLOG(::arg().asNum("logging-facility") );
1614    if(val >= 0)
1615      theL().setFacility(val);
1616    else
1617      L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
1618  }
1619
1620  L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2012 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1621#ifdef __GNUC__
1622  L<<", gcc "__VERSION__;
1623#endif // add other compilers here
1624#ifdef _MSC_VER
1625  L<<", MSVC "<<_MSC_VER;
1626#endif
1627  L<<") starting up"<<endl;
1628 
1629  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1630    "This is free software, and you are welcome to redistribute it "
1631    "according to the terms of the GPL version 2."<<endl;
1632 
1633  L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1634 
1635  #if 0
1636  unsigned int maxFDs, curFDs;
1637  getFDLimits(curFDs, maxFDs);
1638  if(curFDs < 2048)
1639    L<<Logger::Warning<<"Only "<<curFDs<<" file descriptors available (out of: "<<maxFDs<<"), may not be suitable for high performance"<<endl;
1640  #endif
1641 
1642  seedRandom(::arg()["entropy-source"]);
1643
1644  parseACLs();
1645 
1646  if(!::arg()["dont-query"].empty()) {
1647    g_dontQuery=new NetmaskGroup;
1648    vector<string> ips;
1649    stringtok(ips, ::arg()["dont-query"], ", ");
1650    ips.push_back("0.0.0.0");
1651    ips.push_back("::");
1652
1653    L<<Logger::Warning<<"Will not send queries to: ";
1654    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1655      g_dontQuery->addMask(*i);
1656      if(i!=ips.begin())
1657        L<<Logger::Warning<<", ";
1658      L<<Logger::Warning<<*i;
1659    }
1660    L<<Logger::Warning<<endl;
1661  }
1662
1663  g_quiet=::arg().mustDo("quiet");
1664  g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
1665  if(g_weDistributeQueries) {
1666      L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
1667  }
1668 
1669  if(::arg().mustDo("trace")) {
1670    SyncRes::setLog(true);
1671    ::arg().set("quiet")="no";
1672    g_quiet=false;
1673  }
1674 
1675  try {
1676    vector<string> addrs; 
1677    if(!::arg()["query-local-address6"].empty()) {
1678      SyncRes::s_doIPv6=true;
1679      L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
1680     
1681      stringtok(addrs, ::arg()["query-local-address6"], ", ;");
1682      BOOST_FOREACH(const string& addr, addrs) {
1683        g_localQueryAddresses6.push_back(ComboAddress(addr));
1684      }
1685    }
1686    else {
1687      L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
1688    }
1689    addrs.clear();
1690    stringtok(addrs, ::arg()["query-local-address"], ", ;");
1691    BOOST_FOREACH(const string& addr, addrs) {
1692      g_localQueryAddresses4.push_back(ComboAddress(addr));
1693    }
1694  }
1695  catch(std::exception& e) {
1696    L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
1697    exit(99);
1698  }
1699
1700  SyncRes::s_doAAAAAdditionalProcessing = ::arg().mustDo("aaaa-additional-processing");
1701  SyncRes::s_doAdditionalProcessing = ::arg().mustDo("additional-processing") | SyncRes::s_doAAAAAdditionalProcessing;
1702 
1703  SyncRes::s_noEDNSPing = ::arg().mustDo("disable-edns-ping");
1704  SyncRes::s_noEDNS = ::arg().mustDo("disable-edns");
1705
1706  SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
1707
1708  SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1709  SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
1710  SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
1711  SyncRes::s_packetcacheservfailttl=::arg().asNum("packetcache-servfail-ttl");
1712  SyncRes::s_serverID=::arg()["server-id"];
1713  if(SyncRes::s_serverID.empty()) {
1714    char tmp[128];
1715    gethostname(tmp, sizeof(tmp)-1);
1716    SyncRes::s_serverID=tmp;
1717  }
1718 
1719  g_networkTimeoutMsec = ::arg().asNum("network-timeout");
1720
1721  g_initialDomainMap = parseAuthAndForwards();
1722 
1723   
1724  g_logCommonErrors=::arg().mustDo("log-common-errors");
1725 
1726  makeUDPServerSockets();
1727  makeTCPServerSockets();
1728
1729  int forks;
1730  for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
1731    if(!fork()) // we are child
1732      break;
1733  }
1734 
1735  s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
1736  if(!s_pidfname.empty())
1737    unlink(s_pidfname.c_str()); // remove possible old pid file
1738 
1739#ifndef WIN32
1740  if(::arg().mustDo("daemon")) {
1741    L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1742    L.toConsole(Logger::Critical);
1743    daemonize();
1744  }
1745  signal(SIGUSR1,usr1Handler);
1746  signal(SIGUSR2,usr2Handler);
1747  signal(SIGPIPE,SIG_IGN);
1748  writePid();
1749#endif
1750  makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
1751 
1752  int newgid=0;
1753  if(!::arg()["setgid"].empty())
1754    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1755  int newuid=0;
1756  if(!::arg()["setuid"].empty())
1757    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1758
1759  if (!::arg()["chroot"].empty()) {
1760    if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
1761      L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1762      exit(1);
1763    }
1764  }
1765
1766  Utility::dropPrivs(newuid, newgid);
1767  g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
1768 
1769  makeThreadPipes();
1770 
1771  g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1772  g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1773  g_maxMThreads=::arg().asNum("max-mthreads");
1774
1775  if(g_numThreads == 1) {
1776    L<<Logger::Warning<<"Operating unthreaded"<<endl;
1777    recursorThread(0);
1778  }
1779  else {
1780    pthread_t tid;
1781    L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
1782    for(unsigned int n=0; n < g_numThreads; ++n) {
1783      pthread_create(&tid, 0, recursorThread, (void*)n);
1784    }
1785    void* res;
1786
1787   
1788    pthread_join(tid, &res);
1789  }
1790  return 0;
1791}
1792
1793void* recursorThread(void* ptr)
1794try
1795{
1796  t_id=(int) (long) ptr;
1797  SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
1798  t_sstorage->domainmap = g_initialDomainMap;
1799  t_allowFrom = g_initialAllowFrom;
1800  t_udpclientsocks = new UDPClientSocks();
1801  t_tcpClientCounts = new tcpClientCounts_t();
1802  primeHints();
1803 
1804  t_packetCache = new RecursorPacketCache();
1805 
1806  L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1807   
1808  t_RC->d_followRFC2181=::arg().mustDo("auth-can-lower-ttl");
1809  t_pdl = new shared_ptr<PowerDNSLua>();
1810 
1811  try {
1812    if(!::arg()["lua-dns-script"].empty()) {
1813      *t_pdl = shared_ptr<PowerDNSLua>(new PowerDNSLua(::arg()["lua-dns-script"]));
1814      L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
1815    }
1816   
1817  }
1818  catch(std::exception &e) {
1819    L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
1820    exit(99);
1821  }
1822 
1823  t_remotes = new RemoteKeeper();
1824  t_remotes->remotes.resize(::arg().asNum("remotes-ringbuffer-entries") / g_numThreads); 
1825 
1826  if(!t_remotes->remotes.empty())
1827    memset(&t_remotes->remotes[0], 0, t_remotes->remotes.size() * sizeof(RemoteKeeper::remotes_t::value_type));
1828 
1829 
1830  MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
1831 
1832  PacketID pident;
1833
1834  t_fdm=getMultiplexer();
1835  if(!t_id) 
1836    L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
1837
1838  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
1839
1840  if(!g_weDistributeQueries || !t_id)  // if we distribute queries, only t_id = 0 listens
1841    for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
1842      t_fdm->addReadFD(i->first, i->second);
1843 
1844  if(!t_id) {
1845    t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1846  }
1847
1848  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1849 
1850  bool listenOnTCP(true);
1851
1852  counter=0; // used to periodically execute certain tasks
1853  for(;;) {
1854    while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
1855     
1856    if(!(counter%500)) {
1857      MT->makeThread(houseKeeping, 0);
1858    }
1859
1860    if(!(counter%55)) {
1861      typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
1862      expired_t expired=t_fdm->getTimeouts(g_now);
1863       
1864      for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1865        shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
1866        if(g_logCommonErrors)
1867          L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
1868        t_fdm->removeReadFD(i->first);
1869      }
1870    }
1871     
1872    counter++;
1873
1874    if(!t_id && statsWanted) {
1875      doStats();
1876    }
1877
1878    Utility::gettimeofday(&g_now, 0);
1879    t_fdm->run(&g_now);
1880    // 'run' updates g_now for us
1881
1882    if(listenOnTCP) {
1883      if(TCPConnection::getCurrentConnections() > maxTcpClients) {  // shutdown, too many connections
1884        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1885          t_fdm->removeReadFD(*i);
1886        listenOnTCP=false;
1887      }
1888    }
1889    else {
1890      if(TCPConnection::getCurrentConnections() <= maxTcpClients) {  // reenable
1891        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1892          t_fdm->addReadFD(*i, handleNewTCPQuestion);
1893        listenOnTCP=true;
1894      }
1895    }
1896  }
1897}
1898catch(AhuException &ae) {
1899  L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1900  return 0;
1901}
1902catch(std::exception &e) {
1903   L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1904   return 0;
1905}
1906catch(...) {
1907   L<<Logger::Error<<"any other exception in main: "<<endl;
1908   return 0;
1909}
1910
1911#ifdef WIN32
1912void doWindowsServiceArguments(RecursorService& recursor)
1913{
1914  if(::arg().mustDo( "register-service" )) {
1915    if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1916      cerr << "Could not register service." << endl;
1917      exit( 99 );
1918    }
1919   
1920    exit( 0 );
1921  }
1922
1923  if ( ::arg().mustDo( "unregister-service" )) {
1924    recursor.unregisterService();
1925    exit( 0 );
1926  }
1927}
1928#endif
1929
1930
1931int main(int argc, char **argv) 
1932{
1933  g_stats.startupTime=time(0);
1934  reportBasicTypes();
1935
1936  int ret = EXIT_SUCCESS;
1937#ifdef WIN32
1938  RecursorService service;
1939  WSADATA wsaData;
1940  if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
1941    cerr<<"Unable to initialize winsock\n";
1942    exit(1);
1943  }
1944#endif // WIN32
1945
1946  try {
1947    ::arg().set("stack-size","stack size per mthread")="200000";
1948    ::arg().set("soa-minimum-ttl","Don't change")="0";
1949    ::arg().set("soa-serial-offset","Don't change")="0";
1950    ::arg().set("no-shuffle","Don't change")="off";
1951    ::arg().set("additional-processing","turn on to do additional processing")="off";
1952    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
1953    ::arg().set("local-port","port to listen on")="53";
1954    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
1955    ::arg().set("trace","if we should output heaps of logging")="off";
1956    ::arg().set("daemon","Operate as a daemon")="yes";
1957    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
1958    ::arg().set("chroot","switch to chroot jail")="";
1959    ::arg().set("setgid","If set, change group id to this gid for more security")="";
1960    ::arg().set("setuid","If set, change user id to this uid for more security")="";
1961    ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
1962    ::arg().set("threads", "Launch this number of threads")="2";
1963    ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
1964#ifdef WIN32
1965    ::arg().set("quiet","Suppress logging of questions and answers")="off";
1966    ::arg().setSwitch( "register-service", "Register the service" )= "no";
1967    ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
1968    ::arg().setSwitch( "ntservice", "Run as service" )= "no";
1969    ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes"; 
1970    ::arg().setSwitch( "use-logfile", "Use a log file" )= "no"; 
1971    ::arg().setSwitch( "logfile", "Filename of the log file" )= "recursor.log"; 
1972#else
1973    ::arg().set("quiet","Suppress logging of questions and answers")="";
1974    ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
1975#endif
1976    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
1977#ifndef WIN32
1978    ::arg().set("socket-owner","Owner of socket")="";
1979    ::arg().set("socket-group","Group of socket")="";
1980    ::arg().set("socket-mode", "Permissions for socket")="";
1981#endif
1982   
1983    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
1984    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
1985    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
1986    ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
1987    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
1988    ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
1989    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
1990    ::arg().set("hint-file", "If set, load root hints from this file")="";
1991    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
1992    ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
1993    ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
1994    ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
1995    ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
1996    ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
1997    ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
1998    ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
1999    ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
2000    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
2001    ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
2002    ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
2003    ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=LOCAL_NETS; 
2004    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
2005    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
2006    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
2007    ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
2008    ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
2009    ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
2010    ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
2011    ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
2012    ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
2013    ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
2014    ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
2015    ::arg().set("auth-can-lower-ttl", "If we follow RFC 2181 to the letter, an authoritative server can lower the TTL of NS records")="off";
2016    ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
2017    ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatibility" )= "off"; 
2018    ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; 
2019    ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; 
2020    ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no"; 
2021    ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
2022   
2023
2024    ::arg().setCmd("help","Provide a helpful message");
2025    ::arg().setCmd("version","Print version string ("VERSION")");
2026    ::arg().setCmd("config","Output blank configuration");
2027    L.toConsole(Logger::Info);
2028    ::arg().laxParse(argc,argv); // do a lax parse
2029
2030    if(::arg().mustDo("config")) {
2031      cout<<::arg().configstring()<<endl;
2032      exit(0);
2033    }
2034
2035
2036    string configname=::arg()["config-dir"]+"/recursor.conf";
2037    cleanSlashes(configname);
2038
2039    if(!::arg().file(configname.c_str())) 
2040      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
2041
2042    ::arg().parse(argc,argv);
2043
2044    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
2045
2046    if(::arg().mustDo("help")) {
2047      cerr<<"syntax:"<<endl<<endl;
2048      cerr<<::arg().helpstring(::arg()["help"])<<endl;
2049      exit(99);
2050    }
2051    if(::arg().mustDo("version")) {
2052      cerr<<"version: "VERSION<<endl;
2053      exit(99);
2054    }
2055
2056#ifndef WIN32
2057    serviceMain(argc, argv);
2058#else
2059    doWindowsServiceArguments(service);
2060        L.toNTLog();
2061    RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" )); 
2062#endif
2063
2064  }
2065  catch(AhuException &ae) {
2066    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2067    ret=EXIT_FAILURE;
2068  }
2069  catch(std::exception &e) {
2070    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2071    ret=EXIT_FAILURE;
2072  }
2073  catch(...) {
2074    L<<Logger::Error<<"any other exception in main: "<<endl;
2075    ret=EXIT_FAILURE;
2076  }
2077 
2078#ifdef WIN32
2079  WSACleanup();
2080#endif // WIN32
2081
2082  return ret;
2083}
Note: See TracBrowser for help on using the browser.