root/trunk/pdns/pdns/communicator.cc @ 115

Revision 115, 11.8 KB (checked in by ahu, 10 years ago)

ultrasparc fixes+bind crasher fixes

  • Property svn:eol-style set to native
  • Property svn:keywords set to author date id revision
Line 
1/*
2    PowerDNS Versatile Database Driven Nameserver
3    Copyright (C) 2002  PowerDNS.COM BV
4
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; either version 2 of the License, or
8    (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18*/
19#include "utility.hh"
20#include <errno.h>
21#include "communicator.hh"
22#include <set>
23
24#include "dnsbackend.hh"
25#include "ueberbackend.hh"
26#include "packethandler.hh"
27#include "resolver.hh"
28#include "logger.hh"
29#include "dns.hh"
30#include "arguments.hh"
31#include "session.hh"
32
33
34void CommunicatorClass::addSuckRequest(const string &domain, const string &master)
35{
36  Lock l(&d_lock);
37 
38  SuckRequest sr;
39  sr.domain = domain;
40  sr.master = master;
41
42  d_suckdomains.push(sr);
43 
44  d_suck_sem.post();
45  d_any_sem.post();
46}
47
48
49void CommunicatorClass::suck(const string &domain,const string &remote)
50{
51  u_int32_t domain_id;
52  PacketHandler P;
53
54  DomainInfo di;
55  di.backend=0;
56  bool first=true;   
57  try {
58    Resolver resolver;
59    resolver.axfr(remote,domain.c_str());
60
61    UeberBackend *B=dynamic_cast<UeberBackend *>(P.getBackend());
62
63    if(!B->getDomainInfo(domain, di) || !di.backend) {
64      L<<Logger::Error<<"Can't determine backend for domain '"<<domain<<"'"<<endl;
65      return;
66    }
67    domain_id=di.id;
68
69    Resolver::res_t recs;
70
71    while(resolver.axfrChunk(recs)) {
72      if(first) {
73        L<<Logger::Error<<"AXFR started for '"<<domain<<"', transaction started"<<endl;
74        di.backend->startTransaction(domain, domain_id);
75        first=false;
76      }
77      for(Resolver::res_t::iterator i=recs.begin();i!=recs.end();++i) {
78        if((i->qname.size()-toLower(i->qname).rfind(toLower(domain)))!=domain.size()) {
79          L<<Logger::Error<<"Remote "<<remote<<" sneaked in out-of-zone data '"<<i->qname<<"' during AXFR of zone '"<<domain<<"'"<<endl;
80          di.backend->abortTransaction();
81          return;
82        }
83        i->domain_id=domain_id;
84        di.backend->feedRecord(*i);
85      }
86    }
87    di.backend->commitTransaction();
88    di.backend->setFresh(domain_id);
89    L<<Logger::Error<<"AXFR done for '"<<domain<<"', zone committed"<<endl;
90  }
91  catch(DBException &re) {
92    L<<Logger::Error<<"Unable to feed record during incoming AXFR of '"+domain+"': "<<re.reason<<endl;
93    if(di.backend && !first) {
94      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
95      di.backend->abortTransaction();
96    }
97  }
98  catch(ResolverException &re) {
99    L<<Logger::Error<<"Unable to AXFR zone '"+domain+"': "<<re.reason<<endl;
100    if(di.backend && !first) {
101      L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl;
102      di.backend->abortTransaction();
103    }
104  }
105}
106
107class FindNS
108{
109public:
110  vector<string>lookup(const string &name, DNSBackend *B)
111  {
112    vector<string>addresses;
113    struct hostent *h;
114    h=gethostbyname(name.c_str());
115
116    if(h) {
117      for(char **h_addr_list=h->h_addr_list;*h_addr_list;++h_addr_list) {
118        ostringstream os;
119        unsigned char *p=reinterpret_cast<unsigned char *>(*h_addr_list);
120        os<<(int)*p++<<".";
121        os<<(int)*p++<<".";
122        os<<(int)*p++<<".";
123        os<<(int)*p++;
124
125        addresses.push_back(os.str());
126      }
127    }
128
129    B->lookup(QType(QType::A),name);
130    DNSResourceRecord rr;
131    while(B->get(rr)) 
132      addresses.push_back(rr.content);   // SOL if you have a CNAME for an NS
133
134    return addresses;
135  }
136}d_fns;
137
138void CommunicatorClass::queueNotifyDomain(const string &domain, DNSBackend *B)
139{
140  set<string> ips;
141 
142  DNSResourceRecord rr;
143  set<string>nsset;
144
145  B->lookup(QType(QType::NS),domain);
146  while(B->get(rr)) 
147    nsset.insert(rr.content);
148 
149  for(set<string>::const_iterator j=nsset.begin();j!=nsset.end();++j) {
150    vector<string>nsips=d_fns.lookup(*j, B);
151    for(vector<string>::const_iterator k=nsips.begin();k!=nsips.end();++k)
152      ips.insert(*k);
153  }
154 
155  // make calls to d_nq.add(domain, ip);
156  for(set<string>::const_iterator j=ips.begin();j!=ips.end();++j) {
157    L<<Logger::Warning<<"Queued notification of domain '"<<domain<<"' to "<<*j<<endl;
158    d_nq.add(domain,*j);
159  }
160 
161  set<string>alsoNotify;
162  B->alsoNotifies(domain, &alsoNotify);
163 
164  for(set<string>::const_iterator j=alsoNotify.begin();j!=alsoNotify.end();++j) {
165    L<<Logger::Warning<<"Queued also-notification of domain '"<<domain<<"' to "<<*j<<endl;
166    d_nq.add(domain,*j);
167  }
168}
169
170bool CommunicatorClass::notifyDomain(const string &domain)
171{
172  DomainInfo di;
173  PacketHandler P;
174  if(!P.getBackend()->getDomainInfo(domain, di)) {
175    L<<Logger::Error<<"No such domain '"<<domain<<"' in our database"<<endl;
176    return false;
177  }
178  queueNotifyDomain(domain, P.getBackend());
179  // call backend and tell them we sent out the notification - even though that is premature   
180  di.backend->setNotified(di.id, di.serial);
181
182  return true; 
183}
184
185
186void CommunicatorClass::masterUpdateCheck(PacketHandler *P)
187{
188  if(!arg().mustDo("master"))
189    return; 
190
191  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
192  vector<DomainInfo> cmdomains;
193  B->getUpdatedMasters(&cmdomains);
194 
195  if(cmdomains.empty()) {
196    if(d_masterschanged)
197      L<<Logger::Error<<"No master domains need notifications"<<endl;
198    d_masterschanged=false;
199  }
200  else {
201    d_masterschanged=true;
202    L<<Logger::Error<<cmdomains.size()<<" domain"<<(cmdomains.size()>1 ? "s" : "")<<" for which we are master need"<<
203      (cmdomains.size()>1 ? "" : "s")<<
204      " notifications"<<endl;
205  }
206
207  // figure out A records of everybody needing notification
208  // do this via the FindNS class, d_fns
209 
210  for(vector<DomainInfo>::const_iterator i=cmdomains.begin();i!=cmdomains.end();++i) {
211
212    queueNotifyDomain(i->zone,P->getBackend());
213    i->backend->setNotified(i->id,i->serial); 
214  }
215}
216
217void CommunicatorClass::slaveRefresh(PacketHandler *P)
218{
219  UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend());
220  vector<DomainInfo> sdomains;
221  B->getUnfreshSlaveInfos(&sdomains);
222 
223  if(sdomains.empty())
224  {
225    if(d_slaveschanged)
226      L<<Logger::Error<<"All slave domains are fresh"<<endl;
227    d_slaveschanged=false;
228    return;
229  }
230  else 
231    L<<Logger::Error<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<<
232      (sdomains.size()>1 ? "" : "s")<<
233      " checking"<<endl;
234 
235  for(vector<DomainInfo>::const_iterator i=sdomains.begin();i!=sdomains.end();++i) {
236    d_slaveschanged=true;
237    u_int32_t ourserial=i->serial,theirserial=0;
238
239    try {
240      Resolver resolver;
241      int res=resolver.getSoaSerial(i->master,i->zone, &theirserial);
242      if(res<=0) {
243        L<<Logger::Error<<"Unable to determine SOA serial for "<<i->zone<<" at "<<i->master<<endl;
244        continue;
245      }
246     
247      if(theirserial<i->serial) {
248        L<<Logger::Error<<"Domain "<<i->zone<<" more recent than master, our serial "<<ourserial<<" > their serial "<<theirserial<<endl;
249        i->backend->setFresh(i->id);
250      }
251      else if(theirserial==i->serial) {
252        L<<Logger::Warning<<"Domain "<<i->zone<<" is fresh"<<endl;
253        i->backend->setFresh(i->id);
254      }
255      else {
256        L<<Logger::Error<<"Domain "<<i->zone<<" is stale, master serial "<<theirserial<<", our serial "<<i->serial<<endl;
257        addSuckRequest(i->zone,i->master);
258      }
259    }
260    catch(ResolverException &re) {
261      L<<Logger::Error<<"Trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl;
262    }
263  }
264} 
265
266
267
268int CommunicatorClass::doNotifications()
269{
270  struct sockaddr_in from;
271  Utility::socklen_t fromlen=sizeof(from);
272  char buffer[1500];
273  int size;
274  static Resolver d_nresolver;
275  // receive incoming notifications on the nonblocking socket and take them off the list
276
277#ifndef WIN32
278  while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) {
279#else
280  while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) {
281#endif
282
283
284
285    DNSPacket p;
286
287    p.setRemote((struct sockaddr *)&from, fromlen);
288
289    if(p.parse(buffer,size)<0) {
290      L<<Logger::Warning<<"Unable to parse SOA notification answer from "<<p.getRemote()<<endl;
291      continue;
292    }
293
294    if(p.d.rcode)
295      L<<Logger::Warning<<"Received unsuccesful notification report for '"<<p.qdomain<<"' from "<<p.getRemote()<<", rcode: "<<p.d.rcode<<endl;     
296   
297    if(d_nq.removeIf(p.getRemote(), p.d.id, p.qdomain))
298      L<<Logger::Warning<<"Removed from notification list: '"<<p.qdomain<<"' to "<<p.getRemote()<< (p.d.rcode ? "" : " (was acknowledged)")<<endl;     
299    else
300      L<<Logger::Warning<<"Received spurious notify answer for '"<<p.qdomain<<"' from "<<p.getRemote()<<endl;     
301  }
302
303  // send out possible new notifications
304  string domain, ip;
305  u_int16_t id;
306
307  bool purged;
308  while(d_nq.getOne(domain, ip, &id, purged)) {
309    if(!purged) {
310      d_nresolver.notify(d_nsock,domain,ip,id);
311      drillHole(domain,ip);
312    }
313    else
314      L<<Logger::Error<<Logger::NTLog<<"Notification for "<<domain<<" to "<<ip<<" failed after retries"<<endl;
315  }
316
317  return d_nq.earliest();
318}
319
320void CommunicatorClass::drillHole(const string &domain, const string &ip)
321{
322  Lock l(&d_holelock);
323  d_holes[make_pair(domain,ip)]=time(0);
324}
325
326bool CommunicatorClass::justNotified(const string &domain, const string &ip)
327{
328  Lock l(&d_holelock);
329  if(d_holes.find(make_pair(domain,ip))==d_holes.end()) // no hole
330    return false;
331
332  if(d_holes[make_pair(domain,ip)]>time(0)-900)    // recent hole
333    return true;
334
335  // do we want to purge this? XXX FIXME
336  return false;
337}
338
339void CommunicatorClass::makeNotifySocket()
340{
341  if((d_nsock=socket(AF_INET, SOCK_DGRAM,0))<0)
342    throw AhuException(string("notification socket: ")+strerror(errno));
343
344  struct sockaddr_in sin;
345  memset((char *)&sin,0, sizeof(sin));
346 
347  sin.sin_family = AF_INET;
348  sin.sin_addr.s_addr = INADDR_ANY;
349  int n=0;
350  for(;n<10;n++) {
351    sin.sin_port = htons(10000+(Utility::random()%50000));
352   
353    if(bind(d_nsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0) 
354      break;
355  }
356  if(n==10) {
357    Utility::closesocket(d_nsock);
358    d_nsock=-1;
359    throw AhuException(string("binding dnsproxy socket: ")+strerror(errno));
360  }
361  if( !Utility::setNonBlocking( d_nsock ))
362    throw AhuException(string("error getting or setting notify socket non-blocking: ")+strerror(errno));
363
364}
365
366void CommunicatorClass::notify(const string &domain, const string &ip)
367{
368  d_nq.add(domain,ip);
369
370  d_any_sem.post();
371}
372
373void CommunicatorClass::mainloop(void)
374{
375  try {
376#ifndef WIN32
377    signal(SIGPIPE,SIG_IGN);
378#endif // WIN32
379    L<<Logger::Error<<"Master/slave communicator launching"<<endl;
380    PacketHandler P;
381    d_tickinterval=arg().asNum("slave-cycle-interval");
382    makeNotifySocket();
383
384    int rc;
385    time_t next;
386
387    int tick;
388
389    for(;;) {
390      slaveRefresh(&P);
391      masterUpdateCheck(&P);
392
393      tick=min(doNotifications(),
394               d_tickinterval);
395
396      next=time(0)+tick;
397
398      while(time(0)<next) {
399        rc=d_any_sem.tryWait();
400
401        if(rc)
402          Utility::sleep(1);
403        else { 
404          if(!d_suck_sem.tryWait()) {
405            SuckRequest sr;
406            {
407              Lock l(&d_lock);
408              sr=d_suckdomains.front();
409              d_suckdomains.pop();
410            }
411            suck(sr.domain,sr.master);
412          }
413        }
414        // this gets executed at least once every second
415        doNotifications();
416      }
417    }
418  }
419  catch(AhuException &ae) {
420    L<<Logger::Error<<"Communicator thread died because of error: "<<ae.reason<<endl;
421    Utility::sleep(1);
422    exit(0);
423  }
424  catch(exception &e) {
425    L<<Logger::Error<<"Communicator thread died because of STL error: "<<e.what()<<endl;
426    exit(0);
427  }
428  catch( ... )
429  {
430    L << Logger::Error << "Communicator caught unknown exception." << endl;
431    exit( 0 );
432  }
433}
434
Note: See TracBrowser for help on using the browser.