root/trunk/pdns/pdns/communicator.cc @ 1015

Revision 1015, 12.6 KB (checked in by ahu, 6 years ago)

fix some MOADNSParser problems (notifications, for example), remove dead code, eliminate duplicate code, improve error reporting as reported in ticket 147

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002-2005  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License version 2 as
7    published by the Free Software Foundation;
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*/
18
19#include "utility.hh"
20#include <errno.h>
21#include "communicator.hh"
22#include <set>
23
24#include "dnsbackend.hh"
25#include "ueberbackend.hh"
26#include "packethandler.hh"
27#include "resolver.hh"
28#include "logger.hh"
29#include "dns.hh"
30#include "arguments.hh"
31#include "session.hh"
32#include "packetcache.hh"
33#include <boost/lexical_cast.hpp>
34
35using namespace boost;
36
37void CommunicatorClass::addSuckRequest(const string &domain, const string &master, bool priority)
38{
39  Lock l(&d_lock);
40 
41  SuckRequest sr;
42  sr.domain = domain;
43  sr.master = master;
44
45  if(priority) {
46    d_suckdomains.push_front(sr);
47    d_havepriosuckrequest=true;
48  }
49  else 
50    d_suckdomains.push_back(sr);
51 
52  d_suck_sem.post();
53  d_any_sem.post();
54}
55
56
57void CommunicatorClass::suck(const string &domain,const string &remote)
58{
59  uint32_t domain_id;
60  PacketHandler P;
61
62  DomainInfo di;
63  di.backend=0;
64  bool first=true;   
65  try {
66    Resolver resolver;
67    resolver.axfr(remote,domain.c_str());
68
69    UeberBackend *B=dynamic_cast<UeberBackend *>(P.getBackend());
70
71    if(!B->getDomainInfo(domain, di) || !di.backend) {
72      L<<Logger::Error<<"Can't determine backend for domain '"<<domain<<"'"<<endl;
73      return;
74    }
75    domain_id=di.id;
76
77    Resolver::res_t recs;
78
79    while(resolver.axfrChunk(recs)) {
80      if(first) {
81        L<<Logger::Error<<"AXFR started for '"<<domain<<"', transaction started"<<endl;
82        di.backend->startTransaction(domain, domain_id);
83        first=false;
84      }
85      for(Resolver::res_t::iterator i=recs.begin();i!=recs.end();++i) {
86        if(!endsOn(i->qname, domain)) { 
87          L<<Logger::Error<<"Remote "<<remote<<" sneaked in out-of-zone data '"<<i->qname<<"' during AXFR of zone '"<<domain<<"'"<<endl;
88          di.backend->abortTransaction();
89          return;
90        }
91        i->domain_id=domain_id;
92        if(i->qtype.getCode()>=1024)
93          throw DBException("Database can't store unknown record type "+lexical_cast<string>(i->qtype.getCode()-1024));
94
95        di.backend->feedRecord(*i);
96      }
97    }
98    di.backend->commitTransaction();
99    di.backend->setFresh(domain_id);
100    L<<Logger::Error<<"AXFR done for '"<<domain<<"', zone committed"<<endl;
101  }
102  catch(DBException &re) {
103    L<<Logger::Error<<"Unable to feed record during incoming AXFR of '"+domain+"': "<<re.reason<<endl;
104    if(di.backend && !first) {
105      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
106      di.backend->abortTransaction();
107    }
108  }
109  catch(ResolverException &re) {
110    L<<Logger::Error<<"Unable to AXFR zone '"+domain+"': "<<re.reason<<endl;
111    if(di.backend && !first) {
112      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
113      di.backend->abortTransaction();
114    }
115  }
116}
117
118class FindNS
119{
120public:
121  vector<string>lookup(const string &name, DNSBackend *B)
122  {
123    vector<string>addresses;
124    struct hostent *h;
125    h=gethostbyname(name.c_str());
126
127    if(h) {
128      for(char **h_addr_list=h->h_addr_list;*h_addr_list;++h_addr_list) {
129        ostringstream os;
130        unsigned char *p=reinterpret_cast<unsigned char *>(*h_addr_list);
131        os<<(int)*p++<<".";
132        os<<(int)*p++<<".";
133        os<<(int)*p++<<".";
134        os<<(int)*p++;
135
136        addresses.push_back(os.str());
137      }
138    }
139
140    B->lookup(QType(QType::A),name);
141    DNSResourceRecord rr;
142    while(B->get(rr)) 
143      addresses.push_back(rr.content);   // SOL if you have a CNAME for an NS
144
145    return addresses;
146  }
147}d_fns;
148
149void CommunicatorClass::queueNotifyDomain(const string &domain, DNSBackend *B)
150{
151  set<string> ips;
152 
153  DNSResourceRecord rr;
154  set<string>nsset;
155
156  B->lookup(QType(QType::NS),domain);
157  while(B->get(rr)) 
158    nsset.insert(rr.content);
159 
160  for(set<string>::const_iterator j=nsset.begin();j!=nsset.end();++j) {
161    vector<string>nsips=d_fns.lookup(*j, B);
162    if(nsips.empty())
163      L<<Logger::Warning<<"Unable to queue notification of domain '"<<domain<<"': nameservers do not resolve!"<<endl;
164    for(vector<string>::const_iterator k=nsips.begin();k!=nsips.end();++k)
165      ips.insert(*k);
166  }
167 
168  // make calls to d_nq.add(domain, ip);
169  for(set<string>::const_iterator j=ips.begin();j!=ips.end();++j) {
170    L<<Logger::Warning<<"Queued notification of domain '"<<domain<<"' to "<<*j<<endl;
171    d_nq.add(domain,*j);
172  }
173 
174  set<string>alsoNotify;
175  B->alsoNotifies(domain, &alsoNotify);
176 
177  for(set<string>::const_iterator j=alsoNotify.begin();j!=alsoNotify.end();++j) {
178    L<<Logger::Warning<<"Queued also-notification of domain '"<<domain<<"' to "<<*j<<endl;
179    d_nq.add(domain,*j);
180  }
181}
182
183bool CommunicatorClass::notifyDomain(const string &domain)
184{
185  DomainInfo di;
186  PacketHandler P;
187  if(!P.getBackend()->getDomainInfo(domain, di)) {
188    L<<Logger::Error<<"No such domain '"<<domain<<"' in our database"<<endl;
189    return false;
190  }
191  queueNotifyDomain(domain, P.getBackend());
192  // call backend and tell them we sent out the notification - even though that is premature   
193  di.backend->setNotified(di.id, di.serial);
194
195  return true; 
196}
197
198
199void CommunicatorClass::masterUpdateCheck(PacketHandler *P)
200{
201  if(!arg().mustDo("master"))
202    return; 
203
204  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
205  vector<DomainInfo> cmdomains;
206  B->getUpdatedMasters(&cmdomains);
207 
208  if(cmdomains.empty()) {
209    if(d_masterschanged)
210      L<<Logger::Warning<<"No master domains need notifications"<<endl;
211    d_masterschanged=false;
212  }
213  else {
214    d_masterschanged=true;
215    L<<Logger::Error<<cmdomains.size()<<" domain"<<(cmdomains.size()>1 ? "s" : "")<<" for which we are master need"<<
216      (cmdomains.size()>1 ? "" : "s")<<
217      " notifications"<<endl;
218  }
219
220  // figure out A records of everybody needing notification
221  // do this via the FindNS class, d_fns
222 
223  for(vector<DomainInfo>::const_iterator i=cmdomains.begin();i!=cmdomains.end();++i) {
224    extern PacketCache PC;
225    PC.purge(i->zone); // fixes cvstrac ticket #30
226    queueNotifyDomain(i->zone,P->getBackend());
227    i->backend->setNotified(i->id,i->serial); 
228  }
229}
230
231void CommunicatorClass::slaveRefresh(PacketHandler *P)
232{
233  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
234  vector<DomainInfo> sdomains;
235  B->getUnfreshSlaveInfos(&sdomains);
236 
237  if(sdomains.empty())
238  {
239    if(d_slaveschanged)
240      L<<Logger::Warning<<"All slave domains are fresh"<<endl;
241    d_slaveschanged=false;
242    return;
243  }
244  else 
245    L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
246      (sdomains.size()>1 ? "" : "s")<<
247      " checking"<<endl;
248
249  for(vector<DomainInfo>::const_iterator i=sdomains.begin();i!=sdomains.end();++i) {
250    Resolver resolver;   
251    resolver.makeUDPSocket(); 
252    d_slaveschanged=true;
253    uint32_t ourserial=i->serial,theirserial=0;
254
255    try {
256      if(d_havepriosuckrequest) {
257        d_havepriosuckrequest=false;
258        break;
259      }
260
261      resolver.getSoaSerial(i->master, i->zone, &theirserial);
262     
263      if(theirserial<i->serial) {
264        L<<Logger::Error<<"Domain "<<i->zone<<" more recent than master, our serial "<<ourserial<<" > their serial "<<theirserial<<endl;
265        i->backend->setFresh(i->id);
266      }
267      else if(theirserial==i->serial) {
268        L<<Logger::Warning<<"Domain "<<i->zone<<" is fresh"<<endl;
269        i->backend->setFresh(i->id);
270      }
271      else {
272        L<<Logger::Warning<<"Domain "<<i->zone<<" is stale, master serial "<<theirserial<<", our serial "<<i->serial<<endl;
273        addSuckRequest(i->zone,i->master);
274      }
275    }
276    catch(ResolverException &re) {
277      L<<Logger::Error<<"Error trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl;
278    }
279  }
280} 
281
282
283
284int CommunicatorClass::doNotifications()
285{
286  ComboAddress from;
287  Utility::socklen_t fromlen=sizeof(from);
288  char buffer[1500];
289  int size;
290  static Resolver d_nresolver;
291  // receive incoming notifications on the nonblocking socket and take them off the list
292
293  while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) {
294    DNSPacket p;
295
296    p.setRemote(&from);
297
298    if(p.parse(buffer,size)<0) {
299      L<<Logger::Warning<<"Unable to parse SOA notification answer from "<<p.getRemote()<<endl;
300      continue;
301    }
302
303    if(p.d.rcode)
304      L<<Logger::Warning<<"Received unsuccesful notification report for '"<<p.qdomain<<"' from "<<p.getRemote()<<", rcode: "<<p.d.rcode<<endl;     
305   
306    if(d_nq.removeIf(p.getRemote(), p.d.id, p.qdomain))
307      L<<Logger::Warning<<"Removed from notification list: '"<<p.qdomain<<"' to "<<p.getRemote()<< (p.d.rcode ? "" : " (was acknowledged)")<<endl;     
308    else
309      L<<Logger::Warning<<"Received spurious notify answer for '"<<p.qdomain<<"' from "<<p.getRemote()<<endl;     
310  }
311
312  // send out possible new notifications
313  string domain, ip;
314  uint16_t id;
315
316  bool purged;
317  while(d_nq.getOne(domain, ip, &id, purged)) {
318    if(!purged) {
319      try {
320        d_nresolver.notify(d_nsock, domain, ip, id);
321        drillHole(domain, ip);
322      }
323      catch(ResolverException &re) {
324        L<<Logger::Error<<"Error trying to resolve '"+ip+"' for notifying '"+domain+"' to server: "+re.reason<<endl;
325      }
326    }
327    else
328      L<<Logger::Error<<Logger::NTLog<<"Notification for "<<domain<<" to "<<ip<<" failed after retries"<<endl;
329  }
330
331  return d_nq.earliest();
332}
333
334void CommunicatorClass::drillHole(const string &domain, const string &ip)
335{
336  Lock l(&d_holelock);
337  d_holes[make_pair(domain,ip)]=time(0);
338}
339
340bool CommunicatorClass::justNotified(const string &domain, const string &ip)
341{
342  Lock l(&d_holelock);
343  if(d_holes.find(make_pair(domain,ip))==d_holes.end()) // no hole
344    return false;
345
346  if(d_holes[make_pair(domain,ip)]>time(0)-900)    // recent hole
347    return true;
348
349  // do we want to purge this? XXX FIXME
350  return false;
351}
352
353void CommunicatorClass::makeNotifySocket()
354{
355  if((d_nsock=socket(AF_INET, SOCK_DGRAM,0))<0)
356    throw AhuException(string("notification socket: ")+strerror(errno));
357
358  struct sockaddr_in sin;
359  memset((char *)&sin,0, sizeof(sin));
360 
361  sin.sin_family = AF_INET;
362
363  // Bind to a specific IP (query-local-address) if specified
364  string querylocaladdress(arg()["query-local-address"]);
365  if (querylocaladdress=="") {
366    sin.sin_addr.s_addr = INADDR_ANY;
367  }
368  else
369  {
370    struct hostent *h=0;
371    h=gethostbyname(querylocaladdress.c_str());
372    if(!h) {
373      Utility::closesocket(d_nsock);
374      d_nsock=-1;       
375      throw AhuException("Unable to resolve query local address");
376    }
377
378    sin.sin_addr.s_addr = *(int*)h->h_addr;
379  }
380 
381  int n=0;
382  for(;n<10;n++) {
383    sin.sin_port = htons(10000+(Utility::random()%50000));
384   
385    if(bind(d_nsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
386      break;
387  }
388  if(n==10) {
389    Utility::closesocket(d_nsock);
390    d_nsock=-1;
391    throw AhuException(string("binding notify socket: ")+strerror(errno));
392  }
393  if( !Utility::setNonBlocking( d_nsock ))
394    throw AhuException(string("error getting or setting notify socket non-blocking: ")+strerror(errno));
395
396}
397
398void CommunicatorClass::notify(const string &domain, const string &ip)
399{
400  d_nq.add(domain, ip);
401
402  d_any_sem.post();
403}
404
405void CommunicatorClass::mainloop(void)
406{
407  try {
408#ifndef WIN32
409    signal(SIGPIPE,SIG_IGN);
410#endif // WIN32
411    L<<Logger::Error<<"Master/slave communicator launching"<<endl;
412    PacketHandler P;
413    d_tickinterval=arg().asNum("slave-cycle-interval");
414    makeNotifySocket();
415
416    int rc;
417    time_t next;
418
419    int tick;
420
421    for(;;) {
422      slaveRefresh(&P);
423      masterUpdateCheck(&P);
424
425      tick=min(doNotifications(),
426               d_tickinterval);
427
428      next=time(0)+tick;
429
430      while(time(0)<next) {
431        rc=d_any_sem.tryWait();
432
433        if(rc)
434          Utility::sleep(1);
435        else { 
436          if(!d_suck_sem.tryWait()) {
437            SuckRequest sr;
438            {
439              Lock l(&d_lock);
440              sr=d_suckdomains.front();
441              d_suckdomains.pop_front();
442            }
443            suck(sr.domain,sr.master);
444          }
445        }
446        // this gets executed at least once every second
447        doNotifications();
448      }
449    }
450  }
451  catch(AhuException &ae) {
452    L<<Logger::Error<<"Communicator thread died because of error: "<<ae.reason<<endl;
453    Utility::sleep(1);
454    exit(0);
455  }
456  catch(exception &e) {
457    L<<Logger::Error<<"Communicator thread died because of STL error: "<<e.what()<<endl;
458    exit(0);
459  }
460  catch( ... )
461  {
462    L << Logger::Error << "Communicator caught unknown exception." << endl;
463    exit( 0 );
464  }
465}
466
Note: See TracBrowser for help on using the browser.