root/trunk/pdns/pdns/pdns_recursor.cc

Revision 3153, 68.6 KB (checked in by ahu, 6 weeks ago)

if we failed to make a new UDP socket, we'd report a confusing error about it

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2003 - 2013  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#ifndef WIN32
20# include <netdb.h>
21# include <sys/stat.h>
22# include <unistd.h>
23#else
24 #include "ntservice.hh"
25 #include "recursorservice.hh"
26#endif // WIN32
27
28#include <boost/foreach.hpp>
29#include "json_ws.hh"
30#include <pthread.h>
31#include "recpacketcache.hh"
32#include "utility.hh"
33#include "dns_random.hh"
34#include <iostream>
35#include <errno.h>
36#include <map>
37#include <set>
38#include "recursor_cache.hh"
39#include "cachecleaner.hh"
40#include <stdio.h>
41#include <signal.h>
42#include <stdlib.h>
43#include "misc.hh"
44#include "mtasker.hh"
45#include <utility>
46#include "arguments.hh"
47#include "syncres.hh"
48#include <fcntl.h>
49#include <fstream>
50#include "sstuff.hh"
51#include <boost/tuple/tuple.hpp>
52#include <boost/tuple/tuple_comparison.hpp>
53#include <boost/shared_array.hpp>
54#include <boost/lexical_cast.hpp>
55#include <boost/function.hpp>
56#include <boost/algorithm/string.hpp>
57#include <netinet/tcp.h>
58#include "dnsparser.hh"
59#include "dnswriter.hh"
60#include "dnsrecords.hh"
61#include "zoneparser-tng.hh"
62#include "rec_channel.hh"
63#include "logger.hh"
64#include "iputils.hh"
65#include "mplexer.hh"
66#include "config.h"
67#include "lua-recursor.hh"
68
69#ifndef RECURSOR
70#include "statbag.hh"
71StatBag S;
72#endif
73
74__thread FDMultiplexer* t_fdm;
75__thread unsigned int t_id;
76unsigned int g_maxTCPPerClient;
77unsigned int g_networkTimeoutMsec;
78bool g_logCommonErrors;
79__thread shared_ptr<RecursorLua>* t_pdl;
80__thread RemoteKeeper* t_remotes;
81__thread shared_ptr<Regex>* t_traceRegex;
82
83RecursorControlChannel s_rcc; // only active in thread 0
84
85// for communicating with our threads
86struct ThreadPipeSet
87{
88  int writeToThread;
89  int readToThread;
90  int writeFromThread;
91  int readFromThread;
92};
93
94vector<ThreadPipeSet> g_pipes; // effectively readonly after startup
95
96SyncRes::domainmap_t* g_initialDomainMap; // new threads needs this to be setup
97
98#include "namespaces.hh"
99
100__thread MemRecursorCache* t_RC;
101__thread RecursorPacketCache* t_packetCache;
102RecursorStats g_stats;
103bool g_quiet;
104
105bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets
106
107static __thread NetmaskGroup* t_allowFrom;
108static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this
109
110NetmaskGroup* g_dontQuery;
111string s_programname="pdns_recursor";
112
113typedef vector<int> tcpListenSockets_t;
114tcpListenSockets_t g_tcpListenSockets;   // shared across threads, but this is fine, never written to from a thread. All threads listen on all sockets
115int g_tcpTimeout;
116unsigned int g_maxMThreads;
117struct timeval g_now; // timestamp, updated (too) frequently
118map<int, ComboAddress> g_listenSocketsAddresses; // is shared across all threads right now
119
120__thread MT_t* MT; // the big MTasker
121
122unsigned int g_numThreads;
123
124#define LOCAL_NETS "127.0.0.0/8, 10.0.0.0/8, 100.64.0.0/10, 169.254.0.0/16, 192.168.0.0/16, 172.16.0.0/12, ::1/128, fe80::/10"
125
126//! used to send information to a newborn mthread
127struct DNSComboWriter {
128  DNSComboWriter(const char* data, uint16_t len, const struct timeval& now) : d_mdp(data, len), d_now(now), 
129                                                                                                        d_tcp(false), d_socket(-1)
130  {}
131  MOADNSParser d_mdp;
132  void setRemote(const ComboAddress* sa)
133  {
134    d_remote=*sa;
135  }
136
137  void setSocket(int sock)
138  {
139    d_socket=sock;
140  }
141
142  string getRemote() const
143  {
144    return d_remote.toString();
145  }
146
147  struct timeval d_now;
148  ComboAddress d_remote;
149  bool d_tcp;
150  int d_socket;
151  shared_ptr<TCPConnection> d_tcpConnection;
152};
153
154
155ArgvMap &arg()
156{
157  static ArgvMap theArg;
158  return theArg;
159}
160
161
162void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
163
164// -1 is error, 0 is timeout, 1 is success
165int asendtcp(const string& data, Socket* sock) 
166{
167  PacketID pident;
168  pident.sock=sock;
169  pident.outMSG=data;
170 
171  t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
172  string packet;
173
174  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
175
176  if(!ret || ret==-1) { // timeout
177    t_fdm->removeWriteFD(sock->getHandle());
178  }
179  else if(packet.size() !=data.size()) { // main loop tells us what it sent out, or empty in case of an error
180    return -1;
181  }
182  return ret;
183}
184
185void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
186
187// -1 is error, 0 is timeout, 1 is success
188int arecvtcp(string& data, int len, Socket* sock) 
189{
190  data.clear();
191  PacketID pident;
192  pident.sock=sock;
193  pident.inNeeded=len;
194  t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
195
196  int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
197  if(!ret || ret==-1) { // timeout
198    t_fdm->removeReadFD(sock->getHandle());
199  }
200  else if(data.empty()) {// error, EOF or other
201    return -1;
202  }
203
204  return ret;
205}
206
207vector<ComboAddress> g_localQueryAddresses4, g_localQueryAddresses6; 
208const ComboAddress g_local4("0.0.0.0"), g_local6("::");
209
210//! pick a random query local address
211ComboAddress getQueryLocalAddress(int family, uint16_t port)
212{
213  ComboAddress ret;
214  if(family==AF_INET) {
215    if(g_localQueryAddresses4.empty()) 
216      ret = g_local4;
217    else 
218      ret = g_localQueryAddresses4[dns_random(g_localQueryAddresses4.size())];
219    ret.sin4.sin_port = htons(port);
220  }
221  else {
222    if(g_localQueryAddresses6.empty())
223      ret = g_local6;
224    else
225      ret = g_localQueryAddresses6[dns_random(g_localQueryAddresses6.size())];
226     
227    ret.sin6.sin6_port = htons(port);
228  }
229  return ret;
230}
231
232void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t&);
233
234void setSocketBuffer(int fd, int optname, uint32_t size)
235{
236  uint32_t psize=0;
237  socklen_t len=sizeof(psize);
238 
239  if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
240    L<<Logger::Error<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
241    return; 
242  }
243
244  if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
245    L<<Logger::Error<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
246}
247
248
249static void setSocketReceiveBuffer(int fd, uint32_t size)
250{
251  setSocketBuffer(fd, SO_RCVBUF, size);
252}
253
254static void setSocketSendBuffer(int fd, uint32_t size)
255{
256  setSocketBuffer(fd, SO_SNDBUF, size);
257}
258
259
260// you can ask this class for a UDP socket to send a query from
261// this socket is not yours, don't even think about deleting it
262// but after you call 'returnSocket' on it, don't assume anything anymore
263class UDPClientSocks
264{
265  unsigned int d_numsocks;
266  unsigned int d_maxsocks;
267public:
268  UDPClientSocks() : d_numsocks(0), d_maxsocks(5000)
269  {
270  }
271
272  typedef set<int> socks_t;
273  socks_t d_socks;
274
275  // returning -1 means: temporary OS error (ie, out of files), -2 means OS error
276  int getSocket(const ComboAddress& toaddr, int* fd)
277  {
278    *fd=makeClientSocket(toaddr.sin4.sin_family);
279    if(*fd < 0) // temporary error - receive exception otherwise
280      return -1;
281
282    if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
283      int err = errno;
284      //      returnSocket(*fd);
285      Utility::closesocket(*fd);
286      if(err==ENETUNREACH) // Seth "My Interfaces Are Like A Yo Yo" Arnold special
287        return -2;
288      return -1;
289    }
290
291    d_socks.insert(*fd);
292    d_numsocks++;
293    return 0;
294  }
295
296  void returnSocket(int fd)
297  {
298    socks_t::iterator i=d_socks.find(fd);
299    if(i==d_socks.end()) {
300      throw AhuException("Trying to return a socket (fd="+lexical_cast<string>(fd)+") not in the pool");
301    }
302    returnSocketLocked(i);
303  }
304
305  // return a socket to the pool, or simply erase it
306  void returnSocketLocked(socks_t::iterator& i)
307  {
308    if(i==d_socks.end()) {
309      throw AhuException("Trying to return a socket not in the pool");
310    }
311    try {
312      t_fdm->removeReadFD(*i);
313    }
314    catch(FDMultiplexerException& e) {
315      // we sometimes return a socket that has not yet been assigned to t_fdm
316    }
317    Utility::closesocket(*i);
318   
319    d_socks.erase(i++);
320    --d_numsocks;
321  }
322
323  // returns -1 for errors which might go away, throws for ones that won't
324  static int makeClientSocket(int family)
325  {
326    int ret=(int)socket(family, SOCK_DGRAM, 0);
327
328    if(ret < 0 && errno==EMFILE) // this is not a catastrophic error
329      return ret;
330   
331    if(ret<0) 
332      throw AhuException("Making a socket for resolver (family =)"+lexical_cast<string>(family)+"): "+stringerror());
333
334    Utility::setCloseOnExec(ret);
335
336    int tries=10;
337    while(--tries) {
338      uint16_t port;
339     
340      if(tries==1)  // fall back to kernel 'random'
341        port = 0;
342      else
343        port = 1025 + dns_random(64510);
344     
345      ComboAddress sin=getQueryLocalAddress(family, port); // does htons for us
346
347      if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0) 
348        break;
349    }
350    if(!tries)
351      throw AhuException("Resolver binding to local query client socket: "+stringerror());
352   
353    Utility::setNonBlocking(ret);
354    return ret;
355  }
356};
357
358static __thread UDPClientSocks* t_udpclientsocks;
359
360/* these two functions are used by LWRes */
361// -2 is OS error, -1 is error that depends on the remote, > 0 is success
362int asendto(const char *data, int len, int flags, 
363            const ComboAddress& toaddr, uint16_t id, const string& domain, uint16_t qtype, int* fd) 
364{
365
366  PacketID pident;
367  pident.domain = domain;
368  pident.remote = toaddr;
369  pident.type = qtype;
370
371  // see if there is an existing outstanding request we can chain on to, using partial equivalence function
372  pair<MT_t::waiters_t::iterator, MT_t::waiters_t::iterator> chain=MT->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
373
374  for(; chain.first != chain.second; chain.first++) {
375    if(chain.first->key.fd > -1) { // don't chain onto existing chained waiter!
376      /*
377      cerr<<"Orig: "<<pident.domain<<", "<<pident.remote.toString()<<", id="<<id<<endl;
378      cerr<<"Had hit: "<< chain.first->key.domain<<", "<<chain.first->key.remote.toString()<<", id="<<chain.first->key.id
379          <<", count="<<chain.first->key.chain.size()<<", origfd: "<<chain.first->key.fd<<endl;
380      */
381      chain.first->key.chain.insert(id); // we can chain
382      *fd=-1;                            // gets used in waitEvent / sendEvent later on
383      return 1;
384    }
385  }
386
387  int ret=t_udpclientsocks->getSocket(toaddr, fd);
388  if(ret < 0)
389    return ret;
390
391  pident.fd=*fd;
392  pident.id=id;
393 
394  t_fdm->addReadFD(*fd, handleUDPServerResponse, pident);
395  ret = send(*fd, data, len, 0);
396
397  int tmp = errno;
398
399  if(ret < 0)
400    t_udpclientsocks->returnSocket(*fd);
401
402  errno = tmp; // this is for logging purposes only
403  return ret;
404}
405
406// -1 is error, 0 is timeout, 1 is success
407int arecvfrom(char *data, int len, int flags, const ComboAddress& fromaddr, int *d_len, 
408              uint16_t id, const string& domain, uint16_t qtype, int fd, struct timeval* now)
409{
410  static optional<unsigned int> nearMissLimit;
411  if(!nearMissLimit) 
412    nearMissLimit=::arg().asNum("spoof-nearmiss-max");
413
414  PacketID pident;
415  pident.fd=fd;
416  pident.id=id;
417  pident.domain=domain;
418  pident.type = qtype;
419  pident.remote=fromaddr;
420
421  string packet;
422  int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
423
424  if(ret > 0) {
425    if(packet.empty()) // means "error"
426      return -1; 
427
428    *d_len=(int)packet.size();
429    memcpy(data,packet.c_str(),min(len,*d_len));
430    if(*nearMissLimit && pident.nearMisses > *nearMissLimit) {
431      L<<Logger::Error<<"Too many ("<<pident.nearMisses<<" > "<<*nearMissLimit<<") bogus answers for '"<<domain<<"' from "<<fromaddr.toString()<<", assuming spoof attempt."<<endl;
432      g_stats.spoofCount++;
433      return -1;
434    }
435  }
436  else {
437    if(fd >= 0)
438      t_udpclientsocks->returnSocket(fd);
439  }
440  return ret;
441}
442
443
444string s_pidfname;
445static void writePid(void)
446{
447  ofstream of(s_pidfname.c_str(), std::ios_base::app);
448  if(of)
449    of<< Utility::getpid() <<endl;
450  else
451    L<<Logger::Error<<"Requested to write pid for "<<Utility::getpid()<<" to "<<s_pidfname<<" failed: "<<strerror(errno)<<endl;
452}
453
454typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
455tcpClientCounts_t __thread* t_tcpClientCounts;
456
457TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : d_remote(addr), d_fd(fd)
458{ 
459  ++s_currentConnections; 
460  (*t_tcpClientCounts)[d_remote]++;
461}
462
463TCPConnection::~TCPConnection()
464{
465  if(Utility::closesocket(d_fd) < 0) 
466    unixDie("closing socket for TCPConnection");
467  if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--) 
468    t_tcpClientCounts->erase(d_remote);
469  --s_currentConnections;
470}
471
472AtomicCounter TCPConnection::s_currentConnections; 
473void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
474
475void updateRcodeStats(int res)
476{
477  switch(res) {
478  case RCode::ServFail:
479    g_stats.servFails++;
480    break;
481  case RCode::NXDomain:
482    g_stats.nxDomains++;
483    break;
484  case RCode::NoError:
485    g_stats.noErrors++;
486    break;
487  }
488}
489
490void startDoResolve(void *p)
491{
492  DNSComboWriter* dc=(DNSComboWriter *)p;
493
494  try {
495    uint32_t maxanswersize= dc->d_tcp ? 65535 : 512;
496    EDNSOpts edo;
497    if(getEDNSOpts(dc->d_mdp, &edo)) {
498      maxanswersize = min(edo.d_packetsize, (uint16_t) (dc->d_tcp ? 65535 : 1680));
499    }
500   
501    vector<DNSResourceRecord> ret;
502    vector<uint8_t> packet;
503
504    DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); 
505
506    pw.getHeader()->aa=0;
507    pw.getHeader()->ra=1;
508    pw.getHeader()->qr=1;
509    pw.getHeader()->tc=0;
510    pw.getHeader()->id=dc->d_mdp.d_header.id;
511    pw.getHeader()->rd=dc->d_mdp.d_header.rd;
512
513    SyncRes sr(dc->d_now);
514    bool tracedQuery=false; // we could consider letting Lua know about this too
515    if(t_traceRegex->get() && (*t_traceRegex)->match(dc->d_mdp.d_qname)) {
516      sr.setLogMode(SyncRes::Store);
517      tracedQuery=true;
518    }
519   
520    if(!g_quiet || tracedQuery)
521      L<<Logger::Warning<<t_id<<" ["<<MT->getTid()<<"] " << (dc->d_tcp ? "TCP " : "") << "question for '"<<dc->d_mdp.d_qname<<"|"
522       <<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype)<<"' from "<<dc->getRemote()<<endl;
523
524    sr.setId(MT->getTid());
525    if(!dc->d_mdp.d_header.rd)
526      sr.setCacheOnly();
527
528    int res;
529
530    bool variableAnswer = false;
531    // if there is a RecursorLua active, and it 'took' the query in preResolve, we don't launch beginResolve
532    if(!t_pdl->get() || !(*t_pdl)->preresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer)) {
533       res = sr.beginResolve(dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), dc->d_mdp.d_qclass, ret);
534
535      if(t_pdl->get()) {
536        if(res == RCode::NoError) {
537          vector<DNSResourceRecord>::const_iterator i;
538          for(i=ret.begin(); i!=ret.end(); ++i) 
539              if(i->qtype.getCode() == dc->d_mdp.d_qtype && i->d_place == DNSResourceRecord::ANSWER)
540                        break;
541          if(i == ret.end())
542            (*t_pdl)->nodata(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
543        }
544        else if(res == RCode::NXDomain)
545          (*t_pdl)->nxdomain(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
546     
547      (*t_pdl)->postresolve(dc->d_remote, g_listenSocketsAddresses[dc->d_socket], dc->d_mdp.d_qname, QType(dc->d_mdp.d_qtype), ret, res, &variableAnswer);
548      }
549    }
550   
551    uint32_t minTTL=std::numeric_limits<uint32_t>::max();
552     
553    if(tracedQuery || res < 0 || res == RCode::ServFail || pw.getHeader()->rcode == RCode::ServFail)
554    {
555      string trace(sr.getTrace());
556      if(!trace.empty()) {
557        vector<string> lines;
558        boost::split(lines, trace, boost::is_any_of("\n"));
559        BOOST_FOREACH(const string& line, lines) {
560          if(!line.empty())
561            L<<Logger::Warning<< line << endl;
562        }
563      }
564    }
565
566    if(res < 0) {
567      pw.getHeader()->rcode=RCode::ServFail;
568      // no commit here, because no record
569      g_stats.servFails++;
570    }
571    else {
572      pw.getHeader()->rcode=res;
573      updateRcodeStats(res);
574     
575      if(ret.size()) {
576        orderAndShuffle(ret);
577       
578        for(vector<DNSResourceRecord>::const_iterator i=ret.begin(); i!=ret.end(); ++i) {
579          pw.startRecord(i->qname, i->qtype.getCode(), i->ttl, i->qclass, (DNSPacketWriter::Place)i->d_place); 
580          minTTL = min(minTTL, i->ttl);
581          if(i->qtype.getCode() == QType::A) { // blast out A record w/o doing whole dnswriter thing
582            uint32_t ip=0;
583            IpToU32(i->content, &ip);
584            pw.xfr32BitInt(htonl(ip));
585          } else {
586            shared_ptr<DNSRecordContent> drc(DNSRecordContent::mastermake(i->qtype.getCode(), i->qclass, i->content)); 
587            drc->toPacket(pw);
588          }
589          if(pw.size() > maxanswersize) {
590            pw.rollback();
591            if(i->d_place==DNSResourceRecord::ANSWER)  // only truncate if we actually omitted parts of the answer
592              pw.getHeader()->tc=1;
593            goto sendit; // need to jump over pw.commit
594          }
595        }
596
597      pw.commit();
598      }
599    }
600  sendit:;
601    if(!dc->d_tcp) {
602      sendto(dc->d_socket, (const char*)&*packet.begin(), packet.size(), 0, (struct sockaddr *)(&dc->d_remote), dc->d_remote.getSocklen());
603      if(!SyncRes::s_nopacketcache && !variableAnswer ) {
604        t_packetCache->insertResponsePacket(string((const char*)&*packet.begin(), packet.size()), g_now.tv_sec, 
605                                           min(minTTL, 
606                                               (pw.getHeader()->rcode == RCode::ServFail) ? SyncRes::s_packetcacheservfailttl : SyncRes::s_packetcachettl
607                                               ) 
608                                          );
609      }
610    }
611    else {
612      char buf[2];
613      buf[0]=packet.size()/256;
614      buf[1]=packet.size()%256;
615
616      Utility::iovec iov[2];
617
618      iov[0].iov_base=(void*)buf;              iov[0].iov_len=2;
619      iov[1].iov_base=(void*)&*packet.begin(); iov[1].iov_len = packet.size();
620
621      int ret=Utility::writev(dc->d_socket, iov, 2);
622      bool hadError=true;
623
624      if(ret == 0) 
625        L<<Logger::Error<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
626      else if(ret < 0 ) 
627        L<<Logger::Error<<"Error writing TCP answer to "<<dc->getRemote()<<": "<< strerror(errno) <<endl;
628      else if((unsigned int)ret != 2 + packet.size())
629        L<<Logger::Error<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<ret<<")"<<endl;
630      else
631        hadError=false;
632     
633      // update tcp connection status, either by closing or moving to 'BYTE0'
634   
635      if(hadError) {
636        // no need to remove us from FDM, we weren't there
637        dc->d_socket = -1;
638      }
639      else {
640        dc->d_tcpConnection->state=TCPConnection::BYTE0;
641        Utility::gettimeofday(&g_now, 0); // needs to be updated
642        t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
643        t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
644      }
645    }
646   
647    if(!g_quiet) {
648      L<<Logger::Error<<t_id<<" ["<<MT->getTid()<<"] answer to "<<(dc->d_mdp.d_header.rd?"":"non-rd ")<<"question '"<<dc->d_mdp.d_qname<<"|"<<DNSRecordContent::NumberToType(dc->d_mdp.d_qtype);
649      L<<"': "<<ntohs(pw.getHeader()->ancount)<<" answers, "<<ntohs(pw.getHeader()->arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
650      sr.d_throttledqueries<<" throttled, "<<sr.d_timeouts<<" timeouts, "<<sr.d_tcpoutqueries<<" tcp connections, rcode="<<res<<endl;
651    }
652
653    sr.d_outqueries ? t_RC->cacheMisses++ : t_RC->cacheHits++; 
654    float spent=makeFloat(sr.d_now-dc->d_now);
655    if(spent < 0.001)
656      g_stats.answers0_1++;
657    else if(spent < 0.010)
658      g_stats.answers1_10++;
659    else if(spent < 0.1)
660      g_stats.answers10_100++;
661    else if(spent < 1.0)
662      g_stats.answers100_1000++;
663    else
664      g_stats.answersSlow++;
665
666    uint64_t newLat=(uint64_t)(spent*1000000);
667    if(newLat < 1000000)  // outliers of several minutes exist..
668      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0.0001*newLat);
669
670    delete dc;
671    dc=0;
672  }
673  catch(AhuException &ae) {
674    L<<Logger::Error<<"startDoResolve problem: "<<ae.reason<<endl;
675    delete dc;
676  }
677  catch(MOADNSException& e) {
678    L<<Logger::Error<<"DNS parser error: "<<dc->d_mdp.d_qname<<", "<<e.what()<<endl;
679    delete dc;
680  }
681  catch(std::exception& e) {
682    L<<Logger::Error<<"STL error: "<<e.what()<<endl;
683    delete dc;
684  }
685  catch(...) {
686    L<<Logger::Error<<"Any other exception in a resolver context"<<endl;
687  }
688 
689  g_stats.maxMThreadStackUsage = max(MT->getMaxStackUsage(), g_stats.maxMThreadStackUsage);
690}
691
692void makeControlChannelSocket(int processNum=-1)
693{
694  string sockname=::arg()["socket-dir"]+"/pdns_recursor";
695  if(processNum >= 0)
696    sockname += "."+lexical_cast<string>(processNum);
697  sockname+=".controlsocket";
698  s_rcc.listen(sockname);
699 
700#ifndef WIN32
701  int sockowner = -1;
702  int sockgroup = -1;
703
704  if (!::arg().isEmpty("socket-group"))
705    sockgroup=::arg().asGid("socket-group");
706  if (!::arg().isEmpty("socket-owner"))
707    sockowner=::arg().asUid("socket-owner");
708 
709  if (sockgroup > -1 || sockowner > -1) {
710    if(chown(sockname.c_str(), sockowner, sockgroup) < 0) {
711      unixDie("Failed to chown control socket");
712    }
713  }
714
715  // do mode change if socket-mode is given
716  if(!::arg().isEmpty("socket-mode")) {
717    mode_t sockmode=::arg().asMode("socket-mode");
718    chmod(sockname.c_str(), sockmode);
719  }
720#endif
721}
722
723void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
724{
725  shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(var);
726
727  if(conn->state==TCPConnection::BYTE0) {
728    int bytes=recv(conn->getFD(), conn->data, 2, 0);
729    if(bytes==1)
730      conn->state=TCPConnection::BYTE1;
731    if(bytes==2) { 
732      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
733      conn->bytesread=0;
734      conn->state=TCPConnection::GETQUESTION;
735    }
736    if(!bytes || bytes < 0) {
737      t_fdm->removeReadFD(fd);
738      return;
739    }
740  }
741  else if(conn->state==TCPConnection::BYTE1) {
742    int bytes=recv(conn->getFD(), conn->data+1, 1, 0);
743    if(bytes==1) {
744      conn->state=TCPConnection::GETQUESTION;
745      conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
746      conn->bytesread=0;
747    }
748    if(!bytes || bytes < 0) {
749      if(g_logCommonErrors)
750        L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected after first byte"<<endl;
751      t_fdm->removeReadFD(fd);
752      return;
753    }
754  }
755  else if(conn->state==TCPConnection::GETQUESTION) {
756    int bytes=recv(conn->getFD(), conn->data + conn->bytesread, conn->qlen - conn->bytesread, 0);
757    if(!bytes || bytes < 0) {
758      L<<Logger::Error<<"TCP client "<< conn->d_remote.toString() <<" disconnected while reading question body"<<endl;
759      t_fdm->removeReadFD(fd);
760      return;
761    }
762    conn->bytesread+=bytes;
763    if(conn->bytesread==conn->qlen) {
764      t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
765
766      DNSComboWriter* dc=0;
767      try {
768        dc=new DNSComboWriter(conn->data, conn->qlen, g_now);
769      }
770      catch(MOADNSException &mde) {
771        g_stats.clientParseError++; 
772        if(g_logCommonErrors)
773          L<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toString() <<endl;
774        return;
775      }
776      dc->d_tcpConnection = conn; // carry the torch
777      dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
778      dc->d_tcp=true;
779      dc->setRemote(&conn->d_remote);
780      if(dc->d_mdp.d_header.qr) {
781        delete dc;
782        L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
783        return;
784      }
785      else {
786        ++g_stats.qcounter;
787        ++g_stats.tcpqcounter;
788        MT->makeThread(startDoResolve, dc); // deletes dc, will set state to BYTE0 again
789        return;
790      }
791    }
792  }
793}
794
795//! Handle new incoming TCP connection
796void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
797{
798  ComboAddress addr;
799  socklen_t addrlen=sizeof(addr);
800  int newsock=(int)accept(fd, (struct sockaddr*)&addr, &addrlen);
801  if(newsock>0) {
802    if(MT->numProcesses() > g_maxMThreads) {
803      g_stats.overCapacityDrops++;
804      Utility::closesocket(newsock);
805      return;
806    }
807
808    t_remotes->addRemote(addr);
809    if(t_allowFrom && !t_allowFrom->match(&addr)) {
810      if(!g_quiet) 
811        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address not matched by allow-from"<<endl;
812
813      g_stats.unauthorizedTCP++;
814      Utility::closesocket(newsock);
815      return;
816    }
817    if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
818      g_stats.tcpClientOverflow++;
819      Utility::closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
820      return;
821    }
822   
823    Utility::setNonBlocking(newsock);
824    shared_ptr<TCPConnection> tc(new TCPConnection(newsock, addr));
825    tc->state=TCPConnection::BYTE0;
826   
827    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
828
829    struct timeval now;
830    Utility::gettimeofday(&now, 0);
831    t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
832  }
833}
834 
835string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd)
836{
837  ++g_stats.qcounter;
838  if(fromaddr.sin4.sin_family==AF_INET6)
839     g_stats.ipv6qcounter++;
840
841  string response;
842  try {
843    uint32_t age;
844    if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) {
845      if(!g_quiet)
846        L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl;
847
848      g_stats.packetCacheHits++;
849      SyncRes::s_queries++;
850      ageDNSPacket(response, age);
851      sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen());
852      if(response.length() >= sizeof(struct dnsheader)) {
853        struct dnsheader dh;
854        memcpy(&dh, response.c_str(), sizeof(dh));
855        updateRcodeStats(dh.rcode);
856      }
857      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec
858      return 0;
859    }
860  } 
861  catch(std::exception& e) {
862    L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl;
863    return 0;
864  }
865 
866 
867  if(MT->numProcesses() > g_maxMThreads) {
868    g_stats.overCapacityDrops++;
869    return 0;
870  }
871 
872  DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now);
873  dc->setSocket(fd);
874  dc->setRemote(&fromaddr);
875
876  dc->d_tcp=false;
877  MT->makeThread(startDoResolve, (void*) dc); // deletes dc
878  return 0;
879} 
880 
881void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var)
882{
883  int len;
884  char data[1500];
885  ComboAddress fromaddr;
886  socklen_t addrlen=sizeof(fromaddr);
887 
888  if((len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen)) >= 0) {
889    t_remotes->addRemote(fromaddr);
890
891    if(t_allowFrom && !t_allowFrom->match(&fromaddr)) {
892      if(!g_quiet) 
893        L<<Logger::Error<<"["<<MT->getTid()<<"] dropping UDP query from "<<fromaddr.toString()<<", address not matched by allow-from"<<endl;
894
895      g_stats.unauthorizedUDP++;
896      return;
897    }
898    try {
899      dnsheader* dh=(dnsheader*)data;
900     
901      if(dh->qr) {
902        if(g_logCommonErrors)
903          L<<Logger::Error<<"Ignoring answer from "<<fromaddr.toString()<<" on server socket!"<<endl;
904      }
905      else {
906        string question(data, len);
907        if(g_weDistributeQueries)
908          distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd));
909        else
910          doProcessUDPQuestion(question, fromaddr, fd);
911      }
912    }
913    catch(MOADNSException& mde) {
914      g_stats.clientParseError++; 
915      if(g_logCommonErrors)
916        L<<Logger::Error<<"Unable to parse packet from remote UDP client "<<fromaddr.toString() <<": "<<mde.what()<<endl;
917    }
918  }
919  else {
920    // cerr<<t_id<<" had error: "<<stringerror()<<endl;
921    if(errno == EAGAIN)
922      g_stats.noPacketError++;
923  }
924}
925
926
927typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t;
928deferredAdd_t deferredAdd;
929
930void makeTCPServerSockets()
931{
932  int fd;
933  vector<string>locals;
934  stringtok(locals,::arg()["local-address"]," ,");
935
936  if(locals.empty())
937    throw AhuException("No local address specified");
938 
939  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
940    ServiceTuple st;
941    st.port=::arg().asNum("local-port");
942    parseService(*i, st);
943   
944    ComboAddress sin;
945
946    memset((char *)&sin,0, sizeof(sin));
947    sin.sin4.sin_family = AF_INET;
948    if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
949      sin.sin6.sin6_family = AF_INET6;
950      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
951        throw AhuException("Unable to resolve local address for TCP server on '"+ st.host +"'"); 
952    }
953
954    fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
955    Utility::setCloseOnExec(fd);
956
957    if(fd<0) 
958      throw AhuException("Making a TCP server socket for resolver: "+stringerror());
959
960    int tmp=1;
961    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
962      L<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
963      exit(1);
964    }
965   
966#ifdef TCP_DEFER_ACCEPT
967    if(setsockopt(fd, SOL_TCP,TCP_DEFER_ACCEPT,(char*)&tmp,sizeof tmp) >= 0) {
968      if(i==locals.begin())
969        L<<Logger::Error<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
970    }
971#endif
972
973    sin.sin4.sin_port = htons(st.port);
974    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
975    if (::bind(fd, (struct sockaddr *)&sin, socklen )<0) 
976      throw AhuException("Binding TCP server socket for "+ st.host +": "+stringerror());
977   
978    Utility::setNonBlocking(fd);
979    setSocketSendBuffer(fd, 65000);
980    listen(fd, 128);
981    deferredAdd.push_back(make_pair(fd, handleNewTCPQuestion));
982    g_tcpListenSockets.push_back(fd);
983
984    if(sin.sin4.sin_family == AF_INET) 
985      L<<Logger::Error<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
986    else
987      L<<Logger::Error<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
988  }
989}
990
991
992
993void makeUDPServerSockets()
994{
995  vector<string>locals;
996  stringtok(locals,::arg()["local-address"]," ,");
997
998  if(locals.empty())
999    throw AhuException("No local address specified");
1000 
1001  if(::arg()["local-address"]=="0.0.0.0") {
1002    L<<Logger::Warning<<"It is advised to bind to explicit addresses with the --local-address option"<<endl;
1003  }
1004
1005  for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
1006    ServiceTuple st;
1007    st.port=::arg().asNum("local-port");
1008    parseService(*i, st);
1009
1010    ComboAddress sin;
1011
1012    memset(&sin, 0, sizeof(sin));
1013    sin.sin4.sin_family = AF_INET;
1014    if(!IpToU32(st.host.c_str() , (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
1015      sin.sin6.sin6_family = AF_INET6;
1016      if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
1017        throw AhuException("Unable to resolve local address for UDP server on '"+ st.host +"'"); 
1018    }
1019   
1020    int fd=socket(sin.sin4.sin_family, SOCK_DGRAM, 0);
1021    Utility::setCloseOnExec(fd);
1022
1023    if(fd < 0) {
1024      throw AhuException("Making a UDP server socket for resolver: "+netstringerror());
1025    }
1026
1027    setSocketReceiveBuffer(fd, 200000);
1028    sin.sin4.sin_port = htons(st.port);
1029
1030    int socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
1031    if (::bind(fd, (struct sockaddr *)&sin, socklen)<0) 
1032      throw AhuException("Resolver binding to server socket on port "+ lexical_cast<string>(st.port) +" for "+ st.host+": "+stringerror());
1033   
1034    Utility::setNonBlocking(fd);
1035
1036    deferredAdd.push_back(make_pair(fd, handleNewUDPQuestion));
1037    g_listenSocketsAddresses[fd]=sin;  // this is written to only from the startup thread, not from the workers
1038    if(sin.sin4.sin_family == AF_INET) 
1039      L<<Logger::Error<<"Listening for UDP queries on "<< sin.toString() <<":"<<st.port<<endl;
1040    else
1041      L<<Logger::Error<<"Listening for UDP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
1042  }
1043}
1044
1045
1046#ifndef WIN32
1047void daemonize(void)
1048{
1049  if(fork())
1050    exit(0); // bye bye
1051 
1052  setsid(); 
1053
1054  int i=open("/dev/null",O_RDWR); /* open stdin */
1055  if(i < 0) 
1056    L<<Logger::Critical<<"Unable to open /dev/null: "<<stringerror()<<endl;
1057  else {
1058    dup2(i,0); /* stdin */
1059    dup2(i,1); /* stderr */
1060    dup2(i,2); /* stderr */
1061    close(i);
1062  }
1063}
1064#endif
1065
1066uint64_t counter;
1067bool statsWanted;
1068
1069void usr1Handler(int)
1070{
1071  statsWanted=true;
1072}
1073
1074void usr2Handler(int)
1075{
1076  SyncRes::setDefaultLogMode(SyncRes::Log);
1077  g_quiet=false;
1078  ::arg().set("quiet")="no";
1079
1080}
1081
1082void doStats(void)
1083{
1084  static time_t lastOutputTime;
1085  static uint64_t lastQueryCount;
1086 
1087  if(g_stats.qcounter && (t_RC->cacheHits + t_RC->cacheMisses) && SyncRes::s_queries && SyncRes::s_outqueries) {  // this only runs once thread 0 has had hits
1088    uint64_t cacheHits = broadcastAccFunction<uint64_t>(pleaseGetCacheHits);
1089    uint64_t cacheMisses = broadcastAccFunction<uint64_t>(pleaseGetCacheMisses);
1090   
1091    L<<Logger::Warning<<"stats: "<<g_stats.qcounter<<" questions, "<<
1092      broadcastAccFunction<uint64_t>(pleaseGetCacheSize)<< " cache entries, "<<
1093      broadcastAccFunction<uint64_t>(pleaseGetNegCacheSize)<<" negative entries, "<<
1094      (int)((cacheHits*100.0)/(cacheHits+cacheMisses))<<"% cache hits"<<endl; 
1095   
1096    L<<Logger::Warning<<"stats: throttle map: "
1097      << broadcastAccFunction<uint64_t>(pleaseGetThrottleSize) <<", ns speeds: "
1098      << broadcastAccFunction<uint64_t>(pleaseGetNsSpeedsSize)<<endl; 
1099    L<<Logger::Warning<<"stats: outpacket/query ratio "<<(int)(SyncRes::s_outqueries*100.0/SyncRes::s_queries)<<"%";
1100    L<<Logger::Warning<<", "<<(int)(SyncRes::s_throttledqueries*100.0/(SyncRes::s_outqueries+SyncRes::s_throttledqueries))<<"% throttled, "
1101     <<SyncRes::s_nodelegated<<" no-delegation drops"<<endl;
1102    L<<Logger::Warning<<"stats: "<<SyncRes::s_tcpoutqueries<<" outgoing tcp connections, "<<
1103      broadcastAccFunction<uint64_t>(pleaseGetConcurrentQueries)<<" queries running, "<<SyncRes::s_outgoingtimeouts<<" outgoing timeouts"<<endl;
1104
1105    //L<<Logger::Warning<<"stats: "<<g_stats.ednsPingMatches<<" ping matches, "<<g_stats.ednsPingMismatches<<" mismatches, "<<
1106      //g_stats.noPingOutQueries<<" outqueries w/o ping, "<< g_stats.noEdnsOutQueries<<" w/o EDNS"<<endl;
1107   
1108    L<<Logger::Warning<<"stats: " <<  broadcastAccFunction<uint64_t>(pleaseGetPacketCacheSize) <<
1109    " packet cache entries, "<<(int)(100.0*broadcastAccFunction<uint64_t>(pleaseGetPacketCacheHits)/SyncRes::s_queries) << "% packet cache hits"<<endl;
1110   
1111    time_t now = time(0);
1112    if(lastOutputTime && lastQueryCount && now != lastOutputTime) {
1113      L<<Logger::Warning<<"stats: "<< (SyncRes::s_queries - lastQueryCount) / (now - lastOutputTime) <<" qps (average over "<< (now - lastOutputTime) << " seconds)"<<endl;
1114    }
1115    lastOutputTime = now;
1116    lastQueryCount = SyncRes::s_queries;
1117  }
1118  else if(statsWanted) 
1119    L<<Logger::Warning<<"stats: no stats yet!"<<endl;
1120
1121  statsWanted=false;
1122}
1123
1124static void houseKeeping(void *)
1125try
1126{
1127  static __thread time_t last_stat, last_rootupdate, last_prune;
1128  static __thread int cleanCounter=0;
1129  struct timeval now;
1130  Utility::gettimeofday(&now, 0);
1131
1132  // clog<<"* "<<t_id<<" "<<(void*)&last_stat<<"\t"<<(unsigned int)last_stat<<endl;
1133
1134  if(now.tv_sec - last_prune > (time_t)(5 + t_id)) { 
1135    DTime dt;
1136    dt.setTimeval(now);
1137    t_RC->doPrune(); // this function is local to a thread, so fine anyhow
1138    t_packetCache->doPruneTo(::arg().asNum("max-packetcache-entries") / g_numThreads);
1139   
1140    pruneCollection(t_sstorage->negcache, ::arg().asNum("max-cache-entries") / (g_numThreads * 10), 200);
1141   
1142    if(!((cleanCounter++)%40)) {  // this is a full scan!
1143      time_t limit=now.tv_sec-300;
1144      for(SyncRes::nsspeeds_t::iterator i = t_sstorage->nsSpeeds.begin() ; i!= t_sstorage->nsSpeeds.end(); )
1145        if(i->second.stale(limit))
1146          t_sstorage->nsSpeeds.erase(i++);
1147        else
1148          ++i;
1149    }
1150//    L<<Logger::Warning<<"Spent "<<dt.udiff()/1000<<" msec cleaning"<<endl;
1151    last_prune=time(0);
1152  }
1153 
1154  if(!t_id) {
1155    if(now.tv_sec - last_stat > 1800) { 
1156      doStats();
1157      last_stat=time(0);
1158    }
1159  }
1160 
1161  if(now.tv_sec - last_rootupdate > 7200) {
1162    SyncRes sr(now);
1163    sr.setDoEDNS0(true);
1164    vector<DNSResourceRecord> ret;
1165
1166    sr.setNoCache();
1167    int res=sr.beginResolve(".", QType(QType::NS), 1, ret);
1168    if(!res) {
1169      L<<Logger::Warning<<"Refreshed . records"<<endl;
1170      last_rootupdate=now.tv_sec;
1171    }
1172    else
1173      L<<Logger::Error<<"Failed to update . records, RCODE="<<res<<endl;
1174  }
1175}
1176catch(AhuException& ae)
1177{
1178  L<<Logger::Error<<"Fatal error: "<<ae.reason<<endl;
1179  throw;
1180}
1181;
1182
1183void makeThreadPipes()
1184{
1185  for(unsigned int n=0; n < g_numThreads; ++n) {
1186    struct ThreadPipeSet tps;
1187    int fd[2];
1188    if(pipe(fd) < 0)
1189      unixDie("Creating pipe for inter-thread communications");
1190   
1191    tps.readToThread = fd[0];
1192    tps.writeToThread = fd[1];
1193   
1194    if(pipe(fd) < 0)
1195      unixDie("Creating pipe for inter-thread communications");
1196    tps.readFromThread = fd[0];
1197    tps.writeFromThread = fd[1];
1198   
1199    g_pipes.push_back(tps);
1200  }
1201}
1202
1203struct ThreadMSG
1204{
1205  pipefunc_t func;
1206  bool wantAnswer;
1207};
1208
1209void broadcastFunction(const pipefunc_t& func, bool skipSelf)
1210{
1211  unsigned int n = 0;
1212  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1213  {
1214    if(n++ == t_id) {
1215      if(!skipSelf)
1216        func(); // don't write to ourselves!
1217      continue;
1218    }
1219 
1220    ThreadMSG* tmsg = new ThreadMSG();
1221    tmsg->func = func;
1222    tmsg->wantAnswer = true;
1223    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1224      unixDie("write to thread pipe returned wrong size or error");
1225   
1226    string* resp;
1227    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1228      unixDie("read from thread pipe returned wrong size or error");
1229   
1230    if(resp) {
1231//      cerr <<"got response: " << *resp << endl;
1232      delete resp;
1233    }
1234  }
1235}
1236void distributeAsyncFunction(const pipefunc_t& func)
1237{
1238  static unsigned int counter;
1239  unsigned int target = 1 + (++counter % (g_pipes.size()-1));
1240  // cerr<<"Sending to: "<<target<<endl;
1241  if(target == t_id) {
1242    func();
1243    return;
1244  }
1245  ThreadPipeSet& tps = g_pipes[target];   
1246  ThreadMSG* tmsg = new ThreadMSG();
1247  tmsg->func = func;
1248  tmsg->wantAnswer = false;
1249 
1250  if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1251    unixDie("write to thread pipe returned wrong size or error");
1252   
1253}
1254
1255void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
1256{
1257  ThreadMSG* tmsg;
1258 
1259  if(read(fd, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // fd == readToThread
1260    unixDie("read from thread pipe returned wrong size or error");
1261  }
1262 
1263  void *resp = tmsg->func();
1264  if(tmsg->wantAnswer)
1265    if(write(g_pipes[t_id].writeFromThread, &resp, sizeof(resp)) != sizeof(resp))
1266      unixDie("write to thread pipe returned wrong size or error");
1267 
1268  delete tmsg;
1269}
1270
1271template<class T> void *voider(const boost::function<T*()>& func)
1272{
1273  return func();
1274}
1275
1276vector<ComboAddress>& operator+=(vector<ComboAddress>&a, const vector<ComboAddress>& b)
1277{
1278  a.insert(a.end(), b.begin(), b.end());
1279  return a;
1280}
1281
1282template<class T> T broadcastAccFunction(const boost::function<T*()>& func, bool skipSelf)
1283{
1284  unsigned int n = 0;
1285  T ret=T();
1286  BOOST_FOREACH(ThreadPipeSet& tps, g_pipes) 
1287  {
1288    if(n++ == t_id) {
1289      if(!skipSelf) {
1290        T* resp = (T*)func(); // don't write to ourselves!
1291        if(resp) {
1292          //~ cerr <<"got direct: " << *resp << endl;
1293          ret += *resp;
1294          delete resp;
1295        }
1296      }
1297      continue;
1298    }
1299     
1300    ThreadMSG* tmsg = new ThreadMSG();
1301    tmsg->func = boost::bind(voider<T>, func);
1302    tmsg->wantAnswer = true;
1303 
1304    if(write(tps.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg))
1305      unixDie("write to thread pipe returned wrong size or error");
1306 
1307   
1308    T* resp;
1309    if(read(tps.readFromThread, &resp, sizeof(resp)) != sizeof(resp))
1310      unixDie("read from thread pipe returned wrong size or error");
1311   
1312    if(resp) {
1313      //~ cerr <<"got response: " << *resp << endl;
1314      ret += *resp;
1315      delete resp;
1316    }
1317  }
1318  return ret;
1319}
1320
1321template string broadcastAccFunction(const boost::function<string*()>& fun, bool skipSelf); // explicit instantiation
1322template uint64_t broadcastAccFunction(const boost::function<uint64_t*()>& fun, bool skipSelf); // explicit instantiation
1323template vector<ComboAddress> broadcastAccFunction(const boost::function<vector<ComboAddress> *()>& fun, bool skipSelf); // explicit instantiation
1324
1325void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
1326{
1327  string remote;
1328  string msg=s_rcc.recv(&remote);
1329  RecursorControlParser rcp;
1330  RecursorControlParser::func_t* command;
1331 
1332  string answer=rcp.getAnswer(msg, &command);
1333  try {
1334    s_rcc.send(answer, &remote);
1335    command();
1336  }
1337  catch(std::exception& e) {
1338    L<<Logger::Error<<"Error dealing with control socket request: "<<e.what()<<endl;
1339  }
1340  catch(AhuException& ae) {
1341    L<<Logger::Error<<"Error dealing with control socket request: "<<ae.reason<<endl;
1342  }
1343}
1344
1345void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
1346{
1347  PacketID* pident=any_cast<PacketID>(&var);
1348  //  cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
1349
1350  shared_array<char> buffer(new char[pident->inNeeded]);
1351
1352  int ret=recv(fd, buffer.get(), pident->inNeeded,0);
1353  if(ret > 0) {
1354    pident->inMSG.append(&buffer[0], &buffer[ret]);
1355    pident->inNeeded-=ret;
1356    if(!pident->inNeeded) {
1357      //      cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
1358      PacketID pid=*pident;
1359      string msg=pident->inMSG;
1360     
1361      t_fdm->removeReadFD(fd);
1362      MT->sendEvent(pid, &msg); 
1363    }
1364    else {
1365      //      cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
1366    }
1367  }
1368  else {
1369    PacketID tmp=*pident;
1370    t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
1371    string empty;
1372    MT->sendEvent(tmp, &empty); // this conveys error status
1373  }
1374}
1375
1376void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
1377{
1378  PacketID* pid=any_cast<PacketID>(&var);
1379  int ret=send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
1380  if(ret > 0) {
1381    pid->outPos+=ret;
1382    if(pid->outPos==pid->outMSG.size()) {
1383      PacketID tmp=*pid;
1384      t_fdm->removeWriteFD(fd);
1385      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
1386    }
1387  }
1388  else {  // error or EOF
1389    PacketID tmp(*pid);
1390    t_fdm->removeWriteFD(fd);
1391    string sent;
1392    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
1393  }
1394}
1395
1396// resend event to everybody chained onto it
1397void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
1398{
1399  if(iter->key.chain.empty())
1400    return;
1401  //  cerr<<"doResends called!\n";
1402  for(PacketID::chain_t::iterator i=iter->key.chain.begin(); i != iter->key.chain.end() ; ++i) {
1403    resend.fd=-1;
1404    resend.id=*i;
1405    //    cerr<<"\tResending "<<content.size()<<" bytes for fd="<<resend.fd<<" and id="<<resend.id<<endl;
1406
1407    MT->sendEvent(resend, &content);
1408    g_stats.chainResends++;
1409  }
1410}
1411
1412void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
1413{
1414  PacketID pid=any_cast<PacketID>(var);
1415  int len;
1416  char data[1500];
1417  ComboAddress fromaddr;
1418  socklen_t addrlen=sizeof(fromaddr);
1419
1420  len=recvfrom(fd, data, sizeof(data), 0, (sockaddr *)&fromaddr, &addrlen);
1421
1422  if(len < (int)sizeof(dnsheader)) {
1423    if(len < 0)
1424      ; //      cerr<<"Error on fd "<<fd<<": "<<stringerror()<<"\n";
1425    else {
1426      g_stats.serverParseError++; 
1427      if(g_logCommonErrors)
1428        L<<Logger::Error<<"Unable to parse packet from remote UDP server "<< fromaddr.toString() <<
1429          ": packet smalller than DNS header"<<endl;
1430    }
1431
1432    t_udpclientsocks->returnSocket(fd);
1433    string empty;
1434
1435    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
1436    if(iter != MT->d_waiters.end()) 
1437      doResends(iter, pid, empty);
1438   
1439    MT->sendEvent(pid, &empty); // this denotes error (does lookup again.. at least L1 will be hot)
1440    return;
1441  } 
1442
1443  dnsheader dh;
1444  memcpy(&dh, data, sizeof(dh));
1445 
1446  if(dh.qr) {
1447    PacketID pident;
1448    pident.remote=fromaddr;
1449    pident.id=dh.id;
1450    pident.fd=fd;
1451    if(!dh.qdcount) { // UPC, Nominum, very old BIND on FormErr, NSD
1452      pident.domain.clear();
1453      pident.type = 0;
1454    }
1455    else {
1456      try {
1457        pident.domain=questionExpand(data, len, pident.type); // don't copy this from above - we need to do the actual read
1458      }
1459      catch(std::exception& e) {
1460        g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
1461        L<<Logger::Warning<<"Error in packet from "<< fromaddr.toStringWithPort() << ": "<<e.what() << endl;
1462        return;
1463      }
1464    }
1465    string packet;
1466    packet.assign(data, len);
1467
1468    MT_t::waiters_t::iterator iter=MT->d_waiters.find(pident);
1469    if(iter != MT->d_waiters.end()) {
1470      doResends(iter, pident, packet);
1471    }
1472
1473  retryWithName:
1474
1475    if(!MT->sendEvent(pident, &packet)) {
1476      // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
1477      for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
1478        if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
1479           pdns_iequals(pident.domain, mthread->key.domain)) {
1480          mthread->key.nearMisses++;
1481        }
1482
1483        // be a bit paranoid here since we're weakening our matching
1484        if(pident.domain.empty() && !mthread->key.domain.empty() && !pident.type && mthread->key.type && 
1485           pident.id  == mthread->key.id && mthread->key.remote == pident.remote) {
1486          // cerr<<"Empty response, rest matches though, sending to a waiter"<<endl;
1487          pident.domain = mthread->key.domain;
1488          pident.type = mthread->key.type;
1489          goto retryWithName; // note that this only passes on an error, lwres will still reject the packet
1490        }
1491      }
1492      g_stats.unexpectedCount++; // if we made it here, it really is an unexpected answer
1493      if(g_logCommonErrors) {
1494        L<<Logger::Warning<<"Discarding unexpected packet from "<<fromaddr.toStringWithPort()<<": "<<pident.domain<<", "<<pident.type<<", "<<MT->d_waiters.size()<<" waiters"<<endl;
1495      }
1496    }
1497    else if(fd >= 0) {
1498      t_udpclientsocks->returnSocket(fd);
1499    }
1500  }
1501  else
1502    L<<Logger::Warning<<"Ignoring question on outgoing socket from "<< fromaddr.toStringWithPort()  <<endl;
1503}
1504
1505FDMultiplexer* getMultiplexer()
1506{
1507  FDMultiplexer* ret;
1508  for(FDMultiplexer::FDMultiplexermap_t::const_iterator i = FDMultiplexer::getMultiplexerMap().begin();
1509      i != FDMultiplexer::getMultiplexerMap().end(); ++i) {
1510    try {
1511      ret=i->second();
1512      return ret;
1513    }
1514    catch(FDMultiplexerException &fe) {
1515      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer ("<<fe.what()<<"), falling back"<<endl;
1516    }
1517    catch(...) {
1518      L<<Logger::Error<<"Non-fatal error initializing possible multiplexer"<<endl;
1519    }
1520  }
1521  L<<Logger::Error<<"No working multiplexer found!"<<endl;
1522  exit(1);
1523}
1524
1525 
1526string* doReloadLuaScript()
1527{
1528  string fname= ::arg()["lua-dns-script"];
1529  try {
1530    if(fname.empty()) {
1531      t_pdl->reset();
1532      L<<Logger::Error<<t_id<<" Unloaded current lua script"<<endl;
1533      return new string("unloaded\n");
1534    }
1535    else {
1536      *t_pdl = shared_ptr<RecursorLua>(new RecursorLua(fname));
1537    }
1538  }
1539  catch(std::exception& e) {
1540    L<<Logger::Error<<t_id<<" Retaining current script, error from '"<<fname<<"': "<< e.what() <<endl;
1541    return new string("retaining current script, error from '"+fname+"': "+e.what()+"\n");
1542  }
1543   
1544  L<<Logger::Warning<<t_id<<" (Re)loaded lua script from '"<<fname<<"'"<<endl;
1545  return new string("(re)loaded '"+fname+"'\n");
1546}
1547
1548string doQueueReloadLuaScript(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1549{
1550  if(begin != end) 
1551    ::arg().set("lua-dns-script") = *begin;
1552 
1553  return broadcastAccFunction<string>(doReloadLuaScript);
1554} 
1555
1556string* pleaseUseNewTraceRegex(const std::string& newRegex)
1557try
1558{
1559  if(newRegex.empty()) {
1560    t_traceRegex->reset();
1561    return new string("unset\n");
1562  }
1563  else {
1564    (*t_traceRegex) = shared_ptr<Regex>(new Regex(newRegex));
1565    return new string("ok\n");
1566  }
1567}
1568catch(AhuException& ae)
1569{
1570  return new string(ae.reason+"\n");
1571}
1572
1573string doTraceRegex(vector<string>::const_iterator begin, vector<string>::const_iterator end)
1574{
1575  return broadcastAccFunction<string>(boost::bind(pleaseUseNewTraceRegex, begin!=end ? *begin : ""));
1576}
1577
1578
1579void* recursorThread(void*);
1580
1581void* pleaseSupplantACLs(NetmaskGroup *ng)
1582{
1583  t_allowFrom = ng;
1584  return 0;
1585}
1586
1587int g_argc;
1588char** g_argv;
1589
1590void parseACLs()
1591{
1592  static bool l_initialized;
1593 
1594  if(l_initialized) { // only reload configuration file on second call
1595    string configname=::arg()["config-dir"]+"/recursor.conf";
1596    cleanSlashes(configname);
1597   
1598    if(!::arg().preParseFile(configname.c_str(), "allow-from-file")) 
1599      L<<Logger::Warning<<"Unable to re-parse configuration file '"<<configname<<"'"<<endl;
1600    ::arg().preParse(g_argc, g_argv, "allow-from-file");
1601    ::arg().preParseFile(configname.c_str(), "allow-from", LOCAL_NETS);
1602    ::arg().preParse(g_argc, g_argv, "allow-from");
1603  }
1604
1605  NetmaskGroup* oldAllowFrom = t_allowFrom, *allowFrom=new NetmaskGroup;
1606 
1607  if(!::arg()["allow-from-file"].empty()) {
1608    string line;
1609    ifstream ifs(::arg()["allow-from-file"].c_str());
1610    if(!ifs) {
1611      delete allowFrom; 
1612      throw runtime_error("Could not open '"+::arg()["allow-from-file"]+"': "+stringerror());
1613    }
1614
1615    string::size_type pos;
1616    while(getline(ifs,line)) {
1617      pos=line.find('#');
1618      if(pos!=string::npos)
1619        line.resize(pos);
1620      trim(line);
1621      if(line.empty())
1622        continue;
1623
1624      allowFrom->addMask(line);
1625    }
1626    L<<Logger::Warning<<"Done parsing " << allowFrom->size() <<" allow-from ranges from file '"<<::arg()["allow-from-file"]<<"' - overriding 'allow-from' setting"<<endl;
1627  }
1628  else if(!::arg()["allow-from"].empty()) {
1629    vector<string> ips;
1630    stringtok(ips, ::arg()["allow-from"], ", ");
1631   
1632    L<<Logger::Warning<<"Only allowing queries from: ";
1633    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1634      allowFrom->addMask(*i);
1635      if(i!=ips.begin())
1636        L<<Logger::Warning<<", ";
1637      L<<Logger::Warning<<*i;
1638    }
1639    L<<Logger::Warning<<endl;
1640  }
1641  else {
1642    if(::arg()["local-address"]!="127.0.0.1" && ::arg().asNum("local-port")==53) 
1643      L<<Logger::Error<<"WARNING: Allowing queries from all IP addresses - this can be a security risk!"<<endl;
1644    delete allowFrom;
1645    allowFrom = 0;
1646  }
1647 
1648  g_initialAllowFrom = allowFrom;
1649  broadcastFunction(boost::bind(pleaseSupplantACLs, allowFrom));
1650  delete oldAllowFrom;
1651 
1652  l_initialized = true;
1653}
1654
1655int serviceMain(int argc, char*argv[])
1656{
1657  L.setName("pdns_recursor");
1658
1659  L.setLoglevel((Logger::Urgency)(6)); // info and up
1660
1661  if(!::arg()["logging-facility"].empty()) {
1662    int val=logFacilityToLOG(::arg().asNum("logging-facility") );
1663    if(val >= 0)
1664      theL().setFacility(val);
1665    else
1666      L<<Logger::Error<<"Unknown logging facility "<<::arg().asNum("logging-facility") <<endl;
1667  }
1668
1669  L<<Logger::Warning<<"PowerDNS recursor "<<VERSION<<" (C) 2001-2013 PowerDNS.COM BV ("<<__DATE__", "__TIME__;
1670#ifdef __GNUC__
1671  L<<", gcc "__VERSION__;
1672#endif // add other compilers here
1673#ifdef _MSC_VER
1674  L<<", MSVC "<<_MSC_VER;
1675#endif
1676  L<<") starting up"<<endl;
1677 
1678  L<<Logger::Warning<<"PowerDNS comes with ABSOLUTELY NO WARRANTY. "
1679    "This is free software, and you are welcome to redistribute it "
1680    "according to the terms of the GPL version 2."<<endl;
1681 
1682  L<<Logger::Warning<<"Operating in "<<(sizeof(unsigned long)*8) <<" bits mode"<<endl;
1683 
1684  #if 0
1685  unsigned int maxFDs, curFDs;
1686  getFDLimits(curFDs, maxFDs);
1687  if(curFDs < 2048)
1688    L<<Logger::Warning<<"Only "<<curFDs<<" file descriptors available (out of: "<<maxFDs<<"), may not be suitable for high performance"<<endl;
1689  #endif
1690 
1691  seedRandom(::arg()["entropy-source"]);
1692
1693  parseACLs();
1694 
1695  if(!::arg()["dont-query"].empty()) {
1696    g_dontQuery=new NetmaskGroup;
1697    vector<string> ips;
1698    stringtok(ips, ::arg()["dont-query"], ", ");
1699    ips.push_back("0.0.0.0");
1700    ips.push_back("::");
1701
1702    L<<Logger::Warning<<"Will not send queries to: ";
1703    for(vector<string>::const_iterator i = ips.begin(); i!= ips.end(); ++i) {
1704      g_dontQuery->addMask(*i);
1705      if(i!=ips.begin())
1706        L<<Logger::Warning<<", ";
1707      L<<Logger::Warning<<*i;
1708    }
1709    L<<Logger::Warning<<endl;
1710  }
1711
1712  g_quiet=::arg().mustDo("quiet");
1713  g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries");
1714  if(g_weDistributeQueries) {
1715      L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl;
1716  }
1717 
1718  if(::arg()["trace"]=="fail") {
1719    SyncRes::setDefaultLogMode(SyncRes::Store);
1720  }
1721  else if(::arg().mustDo("trace")) {
1722    SyncRes::setDefaultLogMode(SyncRes::Log);
1723    ::arg().set("quiet")="no";
1724    g_quiet=false;
1725  }
1726 
1727 
1728  try {
1729    vector<string> addrs; 
1730    if(!::arg()["query-local-address6"].empty()) {
1731      SyncRes::s_doIPv6=true;
1732      L<<Logger::Warning<<"Enabling IPv6 transport for outgoing queries"<<endl;
1733     
1734      stringtok(addrs, ::arg()["query-local-address6"], ", ;");
1735      BOOST_FOREACH(const string& addr, addrs) {
1736        g_localQueryAddresses6.push_back(ComboAddress(addr));
1737      }
1738    }
1739    else {
1740      L<<Logger::Warning<<"NOT using IPv6 for outgoing queries - set 'query-local-address6=::' to enable"<<endl;
1741    }
1742    addrs.clear();
1743    stringtok(addrs, ::arg()["query-local-address"], ", ;");
1744    BOOST_FOREACH(const string& addr, addrs) {
1745      g_localQueryAddresses4.push_back(ComboAddress(addr));
1746    }
1747  }
1748  catch(std::exception& e) {
1749    L<<Logger::Error<<"Assigning local query addresses: "<<e.what();
1750    exit(99);
1751  }
1752
1753  SyncRes::s_doAAAAAdditionalProcessing = ::arg().mustDo("aaaa-additional-processing");
1754  SyncRes::s_doAdditionalProcessing = ::arg().mustDo("additional-processing") | SyncRes::s_doAAAAAdditionalProcessing;
1755 
1756  SyncRes::s_noEDNSPing = ::arg().mustDo("disable-edns-ping");
1757  SyncRes::s_noEDNS = ::arg().mustDo("disable-edns");
1758
1759  SyncRes::s_nopacketcache = ::arg().mustDo("disable-packetcache");
1760
1761  SyncRes::s_maxnegttl=::arg().asNum("max-negative-ttl");
1762  SyncRes::s_maxcachettl=::arg().asNum("max-cache-ttl");
1763  SyncRes::s_packetcachettl=::arg().asNum("packetcache-ttl");
1764  SyncRes::s_packetcacheservfailttl=::arg().asNum("packetcache-servfail-ttl");
1765  SyncRes::s_serverID=::arg()["server-id"];
1766  if(SyncRes::s_serverID.empty()) {
1767    char tmp[128];
1768    gethostname(tmp, sizeof(tmp)-1);
1769    SyncRes::s_serverID=tmp;
1770  }
1771 
1772  g_networkTimeoutMsec = ::arg().asNum("network-timeout");
1773
1774  g_initialDomainMap = parseAuthAndForwards();
1775 
1776   
1777  g_logCommonErrors=::arg().mustDo("log-common-errors");
1778 
1779  makeUDPServerSockets();
1780  makeTCPServerSockets();
1781
1782  int forks;
1783  for(forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) {
1784    if(!fork()) // we are child
1785      break;
1786  }
1787 
1788  s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid";
1789  if(!s_pidfname.empty())
1790    unlink(s_pidfname.c_str()); // remove possible old pid file
1791 
1792#ifndef WIN32
1793  if(::arg().mustDo("daemon")) {
1794    L<<Logger::Warning<<"Calling daemonize, going to background"<<endl;
1795    L.toConsole(Logger::Critical);
1796    daemonize();
1797  }
1798  signal(SIGUSR1,usr1Handler);
1799  signal(SIGUSR2,usr2Handler);
1800  signal(SIGPIPE,SIG_IGN);
1801  writePid();
1802#endif
1803  makeControlChannelSocket( ::arg().asNum("processes") > 1 ? forks : -1);
1804 
1805  int newgid=0;
1806  if(!::arg()["setgid"].empty())
1807    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
1808  int newuid=0;
1809  if(!::arg()["setuid"].empty())
1810    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
1811
1812  if (!::arg()["chroot"].empty()) {
1813    if (chroot(::arg()["chroot"].c_str())<0 || chdir("/") < 0) {
1814      L<<Logger::Error<<"Unable to chroot to '"+::arg()["chroot"]+"': "<<strerror (errno)<<", exiting"<<endl;
1815      exit(1);
1816    }
1817  }
1818
1819  Utility::dropPrivs(newuid, newgid);
1820  g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries");
1821 
1822  makeThreadPipes();
1823 
1824  g_tcpTimeout=::arg().asNum("client-tcp-timeout");
1825  g_maxTCPPerClient=::arg().asNum("max-tcp-per-client");
1826  g_maxMThreads=::arg().asNum("max-mthreads");
1827
1828  if(g_numThreads == 1) {
1829    L<<Logger::Warning<<"Operating unthreaded"<<endl;
1830    recursorThread(0);
1831  }
1832  else {
1833    pthread_t tid;
1834    L<<Logger::Warning<<"Launching "<< g_numThreads <<" threads"<<endl;
1835    for(unsigned int n=0; n < g_numThreads; ++n) {
1836      pthread_create(&tid, 0, recursorThread, (void*)(long)n);
1837    }
1838    void* res;
1839
1840   
1841    pthread_join(tid, &res);
1842  }
1843  return 0;
1844}
1845
1846void* recursorThread(void* ptr)
1847try
1848{
1849  t_id=(int) (long) ptr;
1850  SyncRes tmp(g_now); // make sure it allocates tsstorage before we do anything, like primeHints or so..
1851  t_sstorage->domainmap = g_initialDomainMap;
1852  t_allowFrom = g_initialAllowFrom;
1853  t_udpclientsocks = new UDPClientSocks();
1854  t_tcpClientCounts = new tcpClientCounts_t();
1855  primeHints();
1856 
1857  t_packetCache = new RecursorPacketCache();
1858 
1859  L<<Logger::Warning<<"Done priming cache with root hints"<<endl;
1860   
1861  t_pdl = new shared_ptr<RecursorLua>();
1862 
1863  try {
1864    if(!::arg()["lua-dns-script"].empty()) {
1865      *t_pdl = shared_ptr<RecursorLua>(new RecursorLua(::arg()["lua-dns-script"]));
1866      L<<Logger::Warning<<"Loaded 'lua' script from '"<<::arg()["lua-dns-script"]<<"'"<<endl;
1867    }
1868   
1869  }
1870  catch(std::exception &e) {
1871    L<<Logger::Error<<"Failed to load 'lua' script from '"<<::arg()["lua-dns-script"]<<"': "<<e.what()<<endl;
1872    exit(99);
1873  }
1874 
1875  t_traceRegex = new shared_ptr<Regex>();
1876 
1877 
1878  t_remotes = new RemoteKeeper();
1879  t_remotes->remotes.resize(::arg().asNum("remotes-ringbuffer-entries") / g_numThreads); 
1880 
1881  if(!t_remotes->remotes.empty())
1882    memset(&t_remotes->remotes[0], 0, t_remotes->remotes.size() * sizeof(RemoteKeeper::remotes_t::value_type));
1883 
1884 
1885  MT=new MTasker<PacketID,string>(::arg().asNum("stack-size"));
1886 
1887  PacketID pident;
1888
1889  t_fdm=getMultiplexer();
1890  if(!t_id) {
1891    if(::arg().mustDo("experimental-json-interface")) {
1892      L<<Logger::Warning << "Enabling JSON interface" << endl;
1893      new JWebserver(t_fdm);
1894    }
1895    L<<Logger::Error<<"Enabled '"<< t_fdm->getName() << "' multiplexer"<<endl;
1896  }
1897
1898  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest);
1899
1900  if(!g_weDistributeQueries || !t_id)  // if we distribute queries, only t_id = 0 listens
1901    for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i) 
1902      t_fdm->addReadFD(i->first, i->second);
1903 
1904  if(!t_id) {
1905    t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel
1906  }
1907
1908  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients");
1909 
1910  bool listenOnTCP(true);
1911
1912  counter=0; // used to periodically execute certain tasks
1913  for(;;) {
1914    while(MT->schedule(&g_now)); // MTasker letting the mthreads do their thing
1915     
1916    if(!(counter%500)) {
1917      MT->makeThread(houseKeeping, 0);
1918    }
1919
1920    if(!(counter%55)) {
1921      typedef vector<pair<int, FDMultiplexer::funcparam_t> > expired_t;
1922      expired_t expired=t_fdm->getTimeouts(g_now);
1923       
1924      for(expired_t::iterator i=expired.begin() ; i != expired.end(); ++i) {
1925        shared_ptr<TCPConnection> conn=any_cast<shared_ptr<TCPConnection> >(i->second);
1926        if(g_logCommonErrors)
1927          L<<Logger::Warning<<"Timeout from remote TCP client "<< conn->d_remote.toString() <<endl;
1928        t_fdm->removeReadFD(i->first);
1929      }
1930    }
1931     
1932    counter++;
1933
1934    if(!t_id && statsWanted) {
1935      doStats();
1936    }
1937
1938    Utility::gettimeofday(&g_now, 0);
1939    t_fdm->run(&g_now);
1940    // 'run' updates g_now for us
1941
1942    if(listenOnTCP) {
1943      if(TCPConnection::getCurrentConnections() > maxTcpClients) {  // shutdown, too many connections
1944        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1945          t_fdm->removeReadFD(*i);
1946        listenOnTCP=false;
1947      }
1948    }
1949    else {
1950      if(TCPConnection::getCurrentConnections() <= maxTcpClients) {  // reenable
1951        for(tcpListenSockets_t::iterator i=g_tcpListenSockets.begin(); i != g_tcpListenSockets.end(); ++i)
1952          t_fdm->addReadFD(*i, handleNewTCPQuestion);
1953        listenOnTCP=true;
1954      }
1955    }
1956  }
1957}
1958catch(AhuException &ae) {
1959  L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
1960  return 0;
1961}
1962catch(std::exception &e) {
1963   L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
1964   return 0;
1965}
1966catch(...) {
1967   L<<Logger::Error<<"any other exception in main: "<<endl;
1968   return 0;
1969}
1970
1971#ifdef WIN32
1972void doWindowsServiceArguments(RecursorService& recursor)
1973{
1974  if(::arg().mustDo( "register-service" )) {
1975    if ( !recursor.registerService( "The PowerDNS Recursor.", true )) {
1976      cerr << "Could not register service." << endl;
1977      exit( 99 );
1978    }
1979   
1980    exit( 0 );
1981  }
1982
1983  if ( ::arg().mustDo( "unregister-service" )) {
1984    recursor.unregisterService();
1985    exit( 0 );
1986  }
1987}
1988#endif
1989
1990
1991int main(int argc, char **argv) 
1992{
1993  g_argc = argc;
1994  g_argv = argv;
1995  g_stats.startupTime=time(0);
1996  reportBasicTypes();
1997  reportOtherTypes();
1998
1999  int ret = EXIT_SUCCESS;
2000#ifdef WIN32
2001  RecursorService service;
2002  WSADATA wsaData;
2003  if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData )) {
2004    cerr<<"Unable to initialize winsock\n";
2005    exit(1);
2006  }
2007#endif // WIN32
2008
2009  try {
2010    ::arg().set("stack-size","stack size per mthread")="200000";
2011    ::arg().set("soa-minimum-ttl","Don't change")="0";
2012    ::arg().set("soa-serial-offset","Don't change")="0";
2013    ::arg().set("no-shuffle","Don't change")="off";
2014    ::arg().set("additional-processing","turn on to do additional processing")="off";
2015    ::arg().set("aaaa-additional-processing","turn on to do AAAA additional processing (slow)")="off";
2016    ::arg().set("local-port","port to listen on")="53";
2017    ::arg().set("local-address","IP addresses to listen on, separated by spaces or commas. Also accepts ports.")="127.0.0.1";
2018    ::arg().set("trace","if we should output heaps of logging. set to 'fail' to only log failing domains")="off";
2019    ::arg().set("daemon","Operate as a daemon")="yes";
2020    ::arg().set("log-common-errors","If we should log rather common errors")="yes";
2021    ::arg().set("chroot","switch to chroot jail")="";
2022    ::arg().set("setgid","If set, change group id to this gid for more security")="";
2023    ::arg().set("setuid","If set, change user id to this uid for more security")="";
2024    ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500";
2025    ::arg().set("threads", "Launch this number of threads")="2";
2026    ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1";
2027#ifdef WIN32
2028    ::arg().set("quiet","Suppress logging of questions and answers")="off";
2029    ::arg().setSwitch( "register-service", "Register the service" )= "no";
2030    ::arg().setSwitch( "unregister-service", "Unregister the service" )= "no";
2031    ::arg().setSwitch( "ntservice", "Run as service" )= "no";
2032    ::arg().setSwitch( "use-ntlog", "Use the NT logging facilities" )= "yes"; 
2033    ::arg().setSwitch( "use-logfile", "Use a log file" )= "no"; 
2034#else
2035    ::arg().set( "experimental-logfile", "Filename of the log file for JSON parser" )= "/var/log/pdns.log"; 
2036    ::arg().setSwitch( "experimental-json-interface", "If we should run a JSON webserver") = "no";
2037    ::arg().set("quiet","Suppress logging of questions and answers")="";
2038    ::arg().set("logging-facility","Facility to log messages as. 0 corresponds to local0")="";
2039#endif
2040    ::arg().set("config-dir","Location of configuration directory (recursor.conf)")=SYSCONFDIR;
2041#ifndef WIN32
2042    ::arg().set("socket-owner","Owner of socket")="";
2043    ::arg().set("socket-group","Group of socket")="";
2044    ::arg().set("socket-mode", "Permissions for socket")="";
2045#endif
2046   
2047    ::arg().set("socket-dir","Where the controlsocket will live")=LOCALSTATEDIR;
2048    ::arg().set("delegation-only","Which domains we only accept delegations from")="";
2049    ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
2050    ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
2051    ::arg().set("client-tcp-timeout","Timeout in seconds when talking to TCP clients")="2";
2052    ::arg().set("max-mthreads", "Maximum number of simultaneous Mtasker threads")="2048";
2053    ::arg().set("max-tcp-clients","Maximum number of simultaneous TCP clients")="128";
2054    ::arg().set("hint-file", "If set, load root hints from this file")="";
2055    ::arg().set("max-cache-entries", "If set, maximum number of entries in the main cache")="1000000";
2056    ::arg().set("max-negative-ttl", "maximum number of seconds to keep a negative cached entry in memory")="3600";
2057    ::arg().set("max-cache-ttl", "maximum number of seconds to keep a cached entry in memory")="86400";
2058    ::arg().set("packetcache-ttl", "maximum number of seconds to keep a cached entry in packetcache")="3600";
2059    ::arg().set("max-packetcache-entries", "maximum number of entries to keep in the packetcache")="500000";
2060    ::arg().set("packetcache-servfail-ttl", "maximum number of seconds to keep a cached servfail entry in packetcache")="60";
2061    ::arg().set("server-id", "Returned when queried for 'server.id' TXT or NSID, defaults to hostname")="";
2062    ::arg().set("remotes-ringbuffer-entries", "maximum number of packets to store statistics for")="0";
2063    ::arg().set("version-string", "string reported on version.pdns or version.bind")="PowerDNS Recursor "VERSION" $Id$";
2064    ::arg().set("allow-from", "If set, only allow these comma separated netmasks to recurse")=LOCAL_NETS;
2065    ::arg().set("allow-from-file", "If set, load allowed netmasks from this file")="";
2066    ::arg().set("entropy-source", "If set, read entropy from this file")="/dev/urandom";
2067    ::arg().set("dont-query", "If set, do not query these netmasks for DNS data")=LOCAL_NETS; 
2068    ::arg().set("max-tcp-per-client", "If set, maximum number of TCP sessions per client (IP address)")="0";
2069    ::arg().set("spoof-nearmiss-max", "If non-zero, assume spoofing after this many near misses")="20";
2070    ::arg().set("single-socket", "If set, only use a single socket for outgoing queries")="off";
2071    ::arg().set("auth-zones", "Zones for which we have authoritative data, comma separated domain=file pairs ")="";
2072    ::arg().set("forward-zones", "Zones for which we forward queries, comma separated domain=ip pairs")="";
2073    ::arg().set("forward-zones-recurse", "Zones for which we forward queries with recursion bit, comma separated domain=ip pairs")="";
2074    ::arg().set("forward-zones-file", "File with (+)domain=ip pairs for forwarding")="";
2075    ::arg().set("export-etc-hosts", "If we should serve up contents from /etc/hosts")="off";
2076    ::arg().set("export-etc-hosts-search-suffix", "Also serve up the contents of /etc/hosts with this suffix")="";
2077    ::arg().set("etc-hosts-file", "Path to 'hosts' file")="/etc/hosts";
2078    ::arg().set("serve-rfc1918", "If we should be authoritative for RFC 1918 private IP space")="";
2079    ::arg().set("lua-dns-script", "Filename containing an optional 'lua' script that will be used to modify dns answers")="";
2080    ::arg().setSwitch( "ignore-rd-bit", "Assume each packet requires recursion, for compatibility" )= "off"; 
2081    ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no"; 
2082    ::arg().setSwitch( "disable-edns", "Disable EDNS" )= ""; 
2083    ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no"; 
2084    ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no";
2085   
2086
2087    ::arg().setCmd("help","Provide a helpful message");
2088    ::arg().setCmd("version","Print version string ("VERSION")");
2089    ::arg().setCmd("config","Output blank configuration");
2090    L.toConsole(Logger::Info);
2091    ::arg().laxParse(argc,argv); // do a lax parse
2092
2093    if(::arg().mustDo("config")) {
2094      cout<<::arg().configstring()<<endl;
2095      exit(0);
2096    }
2097
2098
2099    string configname=::arg()["config-dir"]+"/recursor.conf";
2100    cleanSlashes(configname);
2101
2102    if(!::arg().file(configname.c_str())) 
2103      L<<Logger::Warning<<"Unable to parse configuration file '"<<configname<<"'"<<endl;
2104
2105    ::arg().parse(argc,argv);
2106
2107    ::arg().set("delegation-only")=toLower(::arg()["delegation-only"]);
2108
2109    if(::arg().mustDo("help")) {
2110      cerr<<"syntax:"<<endl<<endl;
2111      cerr<<::arg().helpstring(::arg()["help"])<<endl;
2112      exit(99);
2113    }
2114    if(::arg().mustDo("version")) {
2115      cerr<<"version: "VERSION<<endl;
2116      exit(99);
2117    }
2118
2119#ifndef WIN32
2120    serviceMain(argc, argv);
2121#else
2122    doWindowsServiceArguments(service);
2123        L.toNTLog();
2124    RecursorService::instance()->start( argc, argv, ::arg().mustDo( "ntservice" )); 
2125#endif
2126
2127  }
2128  catch(AhuException &ae) {
2129    L<<Logger::Error<<"Exception: "<<ae.reason<<endl;
2130    ret=EXIT_FAILURE;
2131  }
2132  catch(std::exception &e) {
2133    L<<Logger::Error<<"STL Exception: "<<e.what()<<endl;
2134    ret=EXIT_FAILURE;
2135  }
2136  catch(...) {
2137    L<<Logger::Error<<"any other exception in main: "<<endl;
2138    ret=EXIT_FAILURE;
2139  }
2140 
2141#ifdef WIN32
2142  WSACleanup();
2143#endif // WIN32
2144
2145  return ret;
2146}
Note: See TracBrowser for help on using the browser.