root/tags/pdns-3.1.7.1/pdns/tcpreceiver.cc @ 1381

Revision 1381, 17.2 KB (checked in by ahu, 4 years ago)

the big std::exception cleanup for newer boost for 3.1.7.1, 1274, 1275, 1276

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002-2008  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2
7    as published by the Free Software Foundation
8   
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18#include "packetcache.hh"
19#include "utility.hh"
20#include <cstdio>
21#include <cstring>
22#include <cstdlib>
23#include <sys/types.h>
24#include <iostream>
25#include <string>
26#include "tcpreceiver.hh"
27
28#include <errno.h>
29#include <signal.h>
30
31#include "ueberbackend.hh"
32#include "dnspacket.hh"
33#include "nameserver.hh"
34#include "distributor.hh"
35#include "lock.hh"
36#include "logger.hh"
37#include "arguments.hh"
38
39#include "packethandler.hh"
40#include "statbag.hh"
41#include "resolver.hh"
42#include "communicator.hh"
43using namespace boost;
44
45extern PacketCache PC;
46extern StatBag S;
47
48/**
49\file tcpreceiver.cc
50\brief This file implements the tcpreceiver that receives and answers questions over TCP/IP
51*/
52
53pthread_mutex_t TCPNameserver::s_plock = PTHREAD_MUTEX_INITIALIZER;
54Semaphore *TCPNameserver::d_connectionroom_sem;
55PacketHandler *TCPNameserver::s_P; 
56int TCPNameserver::s_timeout;
57NetmaskGroup TCPNameserver::d_ng;
58
59void TCPNameserver::go()
60{
61  L<<Logger::Error<<"Creating backend connection for TCP"<<endl;
62  s_P=0;
63  try {
64    s_P=new PacketHandler;
65  }
66  catch(AhuException &ae) {
67    L<<Logger::Error<<Logger::NTLog<<"TCP server is unable to launch backends - will try again when questions come in"<<endl;
68    L<<Logger::Error<<"TCP server is unable to launch backends - will try again when questions come in: "<<ae.reason<<endl;
69  }
70  pthread_create(&d_tid, 0, launcher, static_cast<void *>(this));
71}
72
73void *TCPNameserver::launcher(void *data)
74{
75  static_cast<TCPNameserver *>(data)->thread();
76  return 0;
77}
78
79// throws AhuException if things didn't go according to plan, returns 0 if really 0 bytes were read
80int readnWithTimeout(int fd, void* buffer, unsigned int n, bool throwOnEOF=true)
81{
82  unsigned int bytes=n;
83  char *ptr = (char*)buffer;
84  int ret;
85  while(bytes) {
86    ret=read(fd, ptr, bytes);
87    if(ret < 0) {
88      if(errno==EAGAIN) {
89        ret=waitForData(fd, 5);
90        if(ret < 0)
91          throw runtime_error("Waiting for data read");
92        if(!ret)
93          throw runtime_error("Timeout reading data");
94        continue;
95      }
96      else
97        throw AhuException("Reading data: "+stringerror());
98    }
99    if(!ret) {
100      if(!throwOnEOF && n == bytes)
101        return 0;
102      else
103        throw AhuException("Did not fulfill read from TCP due to EOF");
104    }
105   
106    ptr += ret;
107    bytes -= ret;
108  }
109  return n;
110}
111
112// ditto
113void writenWithTimeout(int fd, const void *buffer, unsigned int n)
114{
115  unsigned int bytes=n;
116  const char *ptr = (char*)buffer;
117  int ret;
118  while(bytes) {
119    ret=write(fd, ptr, bytes);
120    if(ret < 0) {
121      if(errno==EAGAIN) {
122        ret=waitForRWData(fd, false, 5, 0);
123        if(ret < 0)
124          throw runtime_error("Waiting for data write");
125        if(!ret)
126          throw runtime_error("Timeout writing data");
127        continue;
128      }
129      else
130        throw AhuException("Writing data: "+stringerror());
131    }
132    if(!ret) {
133      throw AhuException("Did not fulfill TCP write due to EOF");
134    }
135   
136    ptr += ret;
137    bytes -= ret;
138  }
139}
140
141void connectWithTimeout(int fd, struct sockaddr* remote, size_t socklen)
142{
143  int err;
144  Utility::socklen_t len=sizeof(err);
145
146#ifndef WIN32
147  if((err=connect(fd, remote, socklen))<0 && errno!=EINPROGRESS) 
148#else
149  if((err=connect(clisock, remote, socklen))<0 && WSAGetLastError() != WSAEWOULDBLOCK ) 
150#endif // WIN32
151    throw AhuException("connect: "+stringerror());
152
153  if(!err)
154    goto done;
155 
156  err=waitForRWData(fd, false, 5, 0);
157  if(err == 0)
158    throw AhuException("Timeout connecting to remote");
159  if(err < 0)
160    throw AhuException("Error connecting to remote");
161
162  if(getsockopt(fd, SOL_SOCKET,SO_ERROR,(char *)&err,&len)<0)
163    throw AhuException("Error connecting to remote: "+stringerror()); // Solaris
164
165  if(err)
166    throw AhuException("Error connecting to remote: "+string(strerror(err)));
167
168 done:
169  ;
170}
171
172void TCPNameserver::sendPacket(shared_ptr<DNSPacket> p, int outsock)
173{
174  const char *buf=p->getData();
175  uint16_t len=htons(p->len);
176  writenWithTimeout(outsock, &len, 2);
177  writenWithTimeout(outsock, buf, p->len);
178}
179
180
181void TCPNameserver::getQuestion(int fd, char *mesg, int pktlen, const ComboAddress &remote)
182try
183{
184  readnWithTimeout(fd, mesg, pktlen);
185}
186catch(AhuException& ae) {
187    throw AhuException("Error reading DNS data from TCP client "+remote.toString()+": "+ae.reason);
188}
189
190static void proxyQuestion(shared_ptr<DNSPacket> packet)
191{
192  int sock=socket(AF_INET, SOCK_STREAM, 0);
193  if(sock < 0)
194    throw AhuException("Error making TCP connection socket to recursor: "+stringerror());
195
196  Utility::setNonBlocking(sock);
197  ServiceTuple st;
198  st.port=53;
199  parseService(::arg()["recursor"],st);
200
201  try {
202    ComboAddress recursor(st.host, st.port);
203    connectWithTimeout(sock, (struct sockaddr*)&recursor, recursor.getSocklen());
204    const string &buffer=packet->getString();
205   
206    uint16_t len=htons(buffer.length()), slen;
207   
208    writenWithTimeout(sock, &len, 2);
209    writenWithTimeout(sock, buffer.c_str(), buffer.length());
210   
211    int ret;
212   
213    ret=readnWithTimeout(sock, &len, 2);
214    len=ntohs(len);
215
216    char answer[len];
217    ret=readnWithTimeout(sock, answer, len);
218
219    slen=htons(len);
220    writenWithTimeout(packet->getSocket(), &slen, 2);
221   
222    writenWithTimeout(packet->getSocket(), answer, len);
223  }
224  catch(AhuException& ae) {
225    close(sock);
226    throw AhuException("While proxying a question to recursor "+st.host+": " +ae.reason);
227  }
228  close(sock);
229  return;
230}
231
232void *TCPNameserver::doConnection(void *data)
233{
234  shared_ptr<DNSPacket> packet;
235  // Fix gcc-4.0 error (on AMD64)
236  int fd=(int)(long)data; // gotta love C (generates a harmless warning on opteron)
237  pthread_detach(pthread_self());
238  Utility::setNonBlocking(fd);
239  try {
240    char mesg[512];
241   
242    DLOG(L<<"TCP Connection accepted on fd "<<fd<<endl);
243   
244    for(;;) {
245      ComboAddress remote;
246      socklen_t remotelen=sizeof(remote);
247      if(getpeername(fd, (struct sockaddr *)&remote, &remotelen) < 0) {
248        L<<Logger::Error<<"Received question from socket which had no remote address, dropping ("<<stringerror()<<")"<<endl;
249        break;
250      }
251
252      uint16_t pktlen;
253      if(!readnWithTimeout(fd, &pktlen, 2, false))
254        break;
255      else
256        pktlen=ntohs(pktlen);
257
258      if(pktlen>511) {
259        L<<Logger::Error<<"Received an overly large question from "<<remote.toString()<<", dropping"<<endl;
260        break;
261      }
262     
263      getQuestion(fd,mesg,pktlen,remote);
264      S.inc("tcp-queries");     
265
266      packet=shared_ptr<DNSPacket>(new DNSPacket);
267      packet->setRemote(&remote);
268      packet->d_tcp=true;
269      packet->setSocket(fd);
270      if(packet->parse(mesg, pktlen)<0)
271        break;
272     
273      if(packet->qtype.getCode()==QType::AXFR || packet->qtype.getCode()==QType::IXFR ) {
274        if(doAXFR(packet->qdomain, packet, fd)) 
275          S.inc("tcp-answers"); 
276        continue;
277      }
278
279      shared_ptr<DNSPacket> reply; 
280      shared_ptr<DNSPacket> cached= shared_ptr<DNSPacket>(new DNSPacket);
281
282      if(!packet->d.rd && (PC.get(packet.get(), cached.get()))) { // short circuit - does the PacketCache recognize this question?
283        cached->setRemote(&packet->remote);
284        cached->spoofID(packet->d.id);
285        sendPacket(cached, fd);
286        S.inc("tcp-answers");
287        continue;
288      }
289       
290      {
291        Lock l(&s_plock);
292        if(!s_P) {
293          L<<Logger::Error<<"TCP server is without backend connections, launching"<<endl;
294          s_P=new PacketHandler;
295        }
296        bool shouldRecurse;
297
298        reply=shared_ptr<DNSPacket>(s_P->questionOrRecurse(packet.get(), &shouldRecurse)); // we really need to ask the backend :-)
299
300        if(shouldRecurse) {
301          proxyQuestion(packet);
302          continue;
303        }
304      }
305
306      if(!reply)  // unable to write an answer?
307        break;
308       
309      S.inc("tcp-answers");
310      sendPacket(reply, fd);
311    }
312  }
313  catch(DBException &e) {
314    Lock l(&s_plock);
315    delete s_P;
316    s_P = 0;
317
318    L<<Logger::Error<<"TCP Connection Thread unable to answer a question because of a backend error, cycling"<<endl;
319  }
320  catch(AhuException &ae) {
321    Lock l(&s_plock);
322    delete s_P;
323    s_P = 0; // on next call, backend will be recycled
324    L<<Logger::Error<<"TCP nameserver had error, cycling backend: "<<ae.reason<<endl;
325  }
326  catch(std::exception &e) {
327    L<<Logger::Error<<"TCP Connection Thread died because of STL error: "<<e.what()<<endl;
328  }
329  catch( ... )
330  {
331    L << Logger::Error << "TCP Connection Thread caught unknown exception." << endl;
332  }
333  d_connectionroom_sem->post();
334  Utility::closesocket(fd);
335
336  return 0;
337}
338
339bool TCPNameserver::canDoAXFR(shared_ptr<DNSPacket> q)
340{
341  if(::arg().mustDo("disable-axfr"))
342    return false;
343
344  if( ::arg()["allow-axfr-ips"].empty() || d_ng.match( (ComboAddress *) &q->remote ) )
345    return true;
346
347  extern CommunicatorClass Communicator;
348
349  if(Communicator.justNotified(q->qdomain, q->getRemote())) { // we just notified this ip
350    L<<Logger::Warning<<"Approved AXFR of '"<<q->qdomain<<"' from recently notified slave "<<q->getRemote()<<endl;
351    return true;
352  }
353
354  return false;
355}
356
357/** do the actual zone transfer. Return 0 in case of error, 1 in case of success */
358int TCPNameserver::doAXFR(const string &target, shared_ptr<DNSPacket> q, int outsock)
359{
360  shared_ptr<DNSPacket> outpacket;
361  if(!canDoAXFR(q)) {
362    L<<Logger::Error<<"AXFR of domain '"<<target<<"' denied to "<<q->getRemote()<<endl;
363
364    outpacket=shared_ptr<DNSPacket>(q->replyPacket());
365    outpacket->setRcode(RCode::Refused); 
366    // FIXME: should actually figure out if we are auth over a zone, and send out 9 if we aren't
367    sendPacket(outpacket,outsock);
368    return 0;
369  }
370  L<<Logger::Error<<"AXFR of domain '"<<target<<"' initiated by "<<q->getRemote()<<endl;
371  outpacket=shared_ptr<DNSPacket>(q->replyPacket());
372
373  DNSResourceRecord soa; 
374  DNSResourceRecord rr;
375
376  SOAData sd;
377  sd.db=(DNSBackend *)-1; // force uncached answer
378  {
379    Lock l(&s_plock);
380   
381    // find domain_id via SOA and list complete domain. No SOA, no AXFR
382   
383    DLOG(L<<"Looking for SOA"<<endl);
384    if(!s_P) {
385      L<<Logger::Error<<"TCP server is without backend connections in doAXFR, launching"<<endl;
386      s_P=new PacketHandler;
387    }
388
389    if(!s_P->getBackend()->getSOA(target,sd)) {
390      L<<Logger::Error<<"AXFR of domain '"<<target<<"' failed: not authoritative"<<endl;
391      outpacket->setRcode(9); // 'NOTAUTH'
392      sendPacket(outpacket,outsock);
393      return 0;
394    }
395
396  }
397  PacketHandler P; // now open up a database connection, we'll need it
398
399  sd.db=(DNSBackend *)-1; // force uncached answer
400  if(!P.getBackend()->getSOA(target, sd)) {
401      L<<Logger::Error<<"AXFR of domain '"<<target<<"' failed: not authoritative in second instance"<<endl;
402    outpacket->setRcode(9); // 'NOTAUTH'
403    sendPacket(outpacket,outsock);
404    return 0;
405  }
406
407  soa.qname=target;
408  soa.qtype=QType::SOA;
409  soa.content=serializeSOAData(sd);
410  soa.ttl=sd.ttl;
411  soa.domain_id=sd.domain_id;
412  soa.d_place=DNSResourceRecord::ANSWER;
413   
414  if(!sd.db || sd.db==(DNSBackend *)-1) {
415    L<<Logger::Error<<"Error determining backend for domain '"<<target<<"' trying to serve an AXFR"<<endl;
416    outpacket->setRcode(RCode::ServFail);
417    sendPacket(outpacket,outsock);
418    return 0;
419  }
420 
421  DLOG(L<<"Issuing list command - opening dedicated database connection"<<endl);
422
423  DNSBackend *B=sd.db; // get the RIGHT backend
424
425  // now list zone
426  if(!(B->list(target, sd.domain_id))) { 
427    L<<Logger::Error<<"Backend signals error condition"<<endl;
428    outpacket->setRcode(2); // 'SERVFAIL'
429    sendPacket(outpacket,outsock);
430    return 0;
431  }
432  /* write first part of answer */
433
434  DLOG(L<<"Sending out SOA"<<endl);
435  outpacket->addRecord(soa); // AXFR format begins and ends with a SOA record, so we add one
436  sendPacket(outpacket, outsock);
437
438  /* now write all other records */
439
440  int count=0;
441  int chunk=100; // FIXME: this should probably be autosizing
442  if(::arg().mustDo("strict-rfc-axfrs"))
443    chunk=1;
444
445  outpacket=shared_ptr<DNSPacket>(q->replyPacket());
446  outpacket->setCompress(false);
447
448  while(B->get(rr)) {
449    if(rr.qtype.getCode()==6)
450      continue; // skip SOA - would indicate end of AXFR
451
452    outpacket->addRecord(rr);
453
454    if(!((++count)%chunk)) {
455      count=0;
456   
457      sendPacket(outpacket, outsock);
458
459      outpacket=shared_ptr<DNSPacket>(q->replyPacket());
460      outpacket->setCompress(false);
461      // FIXME: Subsequent messages SHOULD NOT have a question section, though the final message MAY.
462    }
463  }
464  if(count) {
465    sendPacket(outpacket, outsock);
466  }
467
468  DLOG(L<<"Done writing out records"<<endl);
469  /* and terminate with yet again the SOA record */
470  outpacket=shared_ptr<DNSPacket>(q->replyPacket());
471  outpacket->addRecord(soa);
472  sendPacket(outpacket, outsock);
473  DLOG(L<<"last packet - close"<<endl);
474  L<<Logger::Error<<"AXFR of domain '"<<target<<"' to "<<q->getRemote()<<" finished"<<endl;
475
476  return 1;
477}
478
479TCPNameserver::~TCPNameserver()
480{
481  delete d_connectionroom_sem;
482}
483
484TCPNameserver::TCPNameserver()
485{
486//  sem_init(&d_connectionroom_sem,0,::arg().asNum("max-tcp-connections"));
487  d_connectionroom_sem = new Semaphore( ::arg().asNum( "max-tcp-connections" ));
488
489  s_timeout=10;
490  vector<string>locals;
491  stringtok(locals,::arg()["local-address"]," ,");
492
493  vector<string>locals6;
494  stringtok(locals6,::arg()["local-ipv6"]," ,");
495
496  if(locals.empty() && locals6.empty())
497    throw AhuException("No local address specified");
498
499  d_highfd=0;
500
501  vector<string> parts;
502  stringtok( parts, ::arg()["allow-axfr-ips"], ", \t" ); // is this IP on the guestlist?
503  for( vector<string>::const_iterator i = parts.begin(); i != parts.end(); ++i ) {
504    d_ng.addMask( *i );
505  }
506
507#ifndef WIN32
508  signal(SIGPIPE,SIG_IGN);
509#endif // WIN32
510  FD_ZERO(&d_rfds); 
511
512  for(vector<string>::const_iterator laddr=locals.begin();laddr!=locals.end();++laddr) {
513    int s=socket(AF_INET,SOCK_STREAM,0); 
514
515    if(s<0) 
516      throw AhuException("Unable to acquire TCP socket: "+stringerror());
517
518    ComboAddress local(*laddr, ::arg().asNum("local-port"));
519     
520    int tmp=1;
521    if(setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
522      L<<Logger::Error<<"Setsockopt failed"<<endl;
523      exit(1); 
524    }
525
526    if(::bind(s, (sockaddr*)&local, local.getSocklen())<0) {
527      L<<Logger::Error<<"binding to TCP socket: "<<strerror(errno)<<endl;
528      throw AhuException("Unable to bind to TCP socket");
529    }
530   
531    listen(s,128);
532    L<<Logger::Error<<"TCP server bound to "<<local.toStringWithPort()<<endl;
533    d_sockets.push_back(s);
534    FD_SET(s, &d_rfds);
535    d_highfd=max(s,d_highfd);
536  }
537
538#if !WIN32 && HAVE_IPV6
539  for(vector<string>::const_iterator laddr=locals6.begin();laddr!=locals6.end();++laddr) {
540    int s=socket(AF_INET6,SOCK_STREAM,0); 
541
542    if(s<0) 
543      throw AhuException("Unable to acquire TCPv6 socket: "+stringerror());
544
545    ComboAddress local(*laddr, ::arg().asNum("local-port"));
546
547    int tmp=1;
548    if(setsockopt(s,SOL_SOCKET,SO_REUSEADDR,(char*)&tmp,sizeof tmp)<0) {
549      L<<Logger::Error<<"Setsockopt failed"<<endl;
550      exit(1); 
551    }
552
553    if(bind(s, (const sockaddr*)&local, local.getSocklen())<0) {
554      L<<Logger::Error<<"binding to TCP socket: "<<strerror(errno)<<endl;
555      throw AhuException("Unable to bind to TCPv6 socket");
556    }
557   
558    listen(s,128);
559    L<<Logger::Error<<"TCPv6 server bound to "<<local.toStringWithPort()<<endl;
560    d_sockets.push_back(s);
561    FD_SET(s, &d_rfds);
562    d_highfd=max(s,d_highfd);
563  }
564#endif // WIN32
565}
566
567
568//! Start of TCP operations thread, we launch a new thread for each incoming TCP question
569void TCPNameserver::thread()
570{
571  struct timeval tv;
572  tv.tv_sec=1;
573  tv.tv_usec=0;
574  try {
575    for(;;) {
576      int fd;
577      struct sockaddr_in remote;
578      Utility::socklen_t addrlen=sizeof(remote);
579
580      fd_set rfds=d_rfds; 
581
582      int ret=select(d_highfd+1, &rfds, 0, 0,  0); // blocks, forever if need be
583      if(ret <= 0)
584        continue;
585
586      int sock=-1;
587      for(vector<int>::const_iterator i=d_sockets.begin();i!=d_sockets.end();++i) {
588        if(FD_ISSET(*i, &rfds)) {
589          sock=*i;
590          addrlen=sizeof(remote);
591
592          if((fd=accept(sock, (sockaddr*)&remote, &addrlen))<0) {
593            L<<Logger::Error<<"TCP question accept error: "<<strerror(errno)<<endl;
594           
595            if(errno==EMFILE) {
596              L<<Logger::Error<<Logger::NTLog<<"TCP handler out of filedescriptors, exiting, won't recover from this"<<endl;
597              exit(1);
598            }
599          }
600          else {
601            pthread_t tid;
602            d_connectionroom_sem->wait(); // blocks if no connections are available
603
604            int room;
605            d_connectionroom_sem->getValue( &room);
606            if(room<1)
607              L<<Logger::Warning<<Logger::NTLog<<"Limit of simultaneous TCP connections reached - raise max-tcp-connections"<<endl;
608
609            if(pthread_create(&tid, 0, &doConnection, (void *)fd)) {
610              L<<Logger::Error<<"Error creating thread: "<<stringerror()<<endl;
611              d_connectionroom_sem->post();
612            }
613          }
614        }
615      }
616    }
617  }
618  catch(AhuException &AE) {
619    L<<Logger::Error<<"TCP Namerserver thread dying because of fatal error: "<<AE.reason<<endl;
620  }
621  catch(...) {
622    L<<Logger::Error<<"TCPNameserver dying because of an unexpected fatal error"<<endl;
623  }
624  exit(1); // take rest of server with us
625}
626
627
Note: See TracBrowser for help on using the browser.