Changeset 1658

Show
Ignore:
Timestamp:
07/03/10 15:36:02 (3 years ago)
Author:
ahu
Message:

backport of master/slave parallel communicator

Location:
tags/pdns-2-9-22-x/pdns
Files:
3 added
7 modified

Legend:

Unmodified
Added
Removed
  • tags/pdns-2-9-22-x/pdns/Makefile.am

    r1256 r1658  
    3131dynlistener.cc dynlistener.hh  dynhandler.cc dynhandler.hh  \ 
    3232resolver.hh resolver.cc communicator.cc communicator.hh dnsproxy.cc \ 
     33mastercommunicator.cc slavecommunicator.cc \ 
    3334dnsproxy.hh randombackend.cc unix_utility.cc common_startup.cc \ 
    3435utility.hh iputils.hh common_startup.hh unix_semaphore.cc \ 
  • tags/pdns-2-9-22-x/pdns/communicator.cc

    r1312 r1658  
    11/* 
    22    PowerDNS Versatile Database Driven Nameserver 
    3     Copyright (C) 2002-2008  PowerDNS.COM BV 
     3    Copyright (C) 2002-2009  PowerDNS.COM BV 
    44 
    55    This program is free software; you can redistribute it and/or modify 
     
    3333#include <boost/lexical_cast.hpp> 
    3434 
    35 using namespace boost; 
     35// #include "namespaces.hh" 
    3636 
    37 void CommunicatorClass::addSuckRequest(const string &domain, const string &master, bool priority) 
    38 { 
    39   Lock l(&d_lock); 
    40    
    41   SuckRequest sr; 
    42   sr.domain = domain; 
    43   sr.master = master; 
    44  
    45   if(priority) { 
    46     d_suckdomains.push_front(sr); 
    47     //  d_havepriosuckrequest=true; 
    48   } 
    49   else  
    50     d_suckdomains.push_back(sr); 
    51    
    52   d_suck_sem.post(); 
    53   d_any_sem.post(); 
    54 } 
    55  
    56 void CommunicatorClass::suck(const string &domain,const string &remote) 
    57 { 
    58   L<<Logger::Error<<"Initiating transfer of '"<<domain<<"' from remote '"<<remote<<"'"<<endl; 
    59   uint32_t domain_id; 
    60   PacketHandler P; 
    61  
    62   DomainInfo di; 
    63   di.backend=0; 
    64   bool first=true;     
    65   try { 
    66     Resolver resolver; 
    67     resolver.axfr(remote, domain.c_str()); 
    68  
    69     UeberBackend *B=dynamic_cast<UeberBackend *>(P.getBackend()); 
    70  
    71     if(!B->getDomainInfo(domain, di) || !di.backend) { 
    72       L<<Logger::Error<<"Can't determine backend for domain '"<<domain<<"'"<<endl; 
    73       return; 
    74     } 
    75     domain_id=di.id; 
    76  
    77     Resolver::res_t recs; 
    78  
    79     while(resolver.axfrChunk(recs)) { 
    80       if(first) { 
    81         L<<Logger::Error<<"AXFR started for '"<<domain<<"', transaction started"<<endl; 
    82         di.backend->startTransaction(domain, domain_id); 
    83         first=false; 
    84       } 
    85       for(Resolver::res_t::iterator i=recs.begin();i!=recs.end();++i) { 
    86         if(!endsOn(i->qname, domain)) {  
    87           L<<Logger::Error<<"Remote "<<remote<<" tried to sneak in out-of-zone data '"<<i->qname<<"' during AXFR of zone '"<<domain<<"', ignoring"<<endl; 
    88           continue; 
    89         } 
    90         i->domain_id=domain_id; 
    91         if(i->qtype.getCode()>=1024) 
    92           throw DBException("Database can't store unknown record type "+lexical_cast<string>(i->qtype.getCode()-1024)); 
    93  
    94         di.backend->feedRecord(*i); 
    95       } 
    96     } 
    97     di.backend->commitTransaction(); 
    98     di.backend->setFresh(domain_id); 
    99     L<<Logger::Error<<"AXFR done for '"<<domain<<"', zone committed"<<endl; 
    100   } 
    101   catch(DBException &re) { 
    102     L<<Logger::Error<<"Unable to feed record during incoming AXFR of '"+domain+"': "<<re.reason<<endl; 
    103     if(di.backend && !first) { 
    104       L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl; 
    105       di.backend->abortTransaction(); 
    106     } 
    107   } 
    108   catch(ResolverException &re) { 
    109     L<<Logger::Error<<"Unable to AXFR zone '"+domain+"' from remote '"<<remote<<"': "<<re.reason<<endl; 
    110     if(di.backend && !first) { 
    111       L<<Logger::Error<<"Aborting possible open transaction for domain '"<<domain<<"' AXFR"<<endl; 
    112       di.backend->abortTransaction(); 
    113     } 
    114   } 
    115 } 
    116  
    117 class FindNS 
    118 { 
    119 public: 
    120   vector<string>lookup(const string &name, DNSBackend *B) 
    121   { 
    122     vector<string>addresses; 
    123     struct hostent *h; 
    124     h=gethostbyname(name.c_str()); 
    125  
    126     if(h) { 
    127       for(char **h_addr_list=h->h_addr_list;*h_addr_list;++h_addr_list) { 
    128         ostringstream os; 
    129         unsigned char *p=reinterpret_cast<unsigned char *>(*h_addr_list); 
    130         os<<(int)*p++<<"."; 
    131         os<<(int)*p++<<"."; 
    132         os<<(int)*p++<<"."; 
    133         os<<(int)*p++; 
    134  
    135         addresses.push_back(os.str()); 
    136       } 
    137     } 
    138  
    139     B->lookup(QType(QType::A),name); 
    140     DNSResourceRecord rr; 
    141     while(B->get(rr))  
    142       addresses.push_back(rr.content);   // SOL if you have a CNAME for an NS 
    143  
    144     return addresses; 
    145   } 
    146 }d_fns; 
    147  
    148 void CommunicatorClass::queueNotifyDomain(const string &domain, DNSBackend *B) 
    149 { 
    150   set<string> ips; 
    151    
    152   DNSResourceRecord rr; 
    153   set<string>nsset; 
    154  
    155   B->lookup(QType(QType::NS),domain); 
    156   while(B->get(rr))  
    157     nsset.insert(rr.content); 
    158    
    159   for(set<string>::const_iterator j=nsset.begin();j!=nsset.end();++j) { 
    160     vector<string>nsips=d_fns.lookup(*j, B); 
    161     if(nsips.empty()) 
    162       L<<Logger::Warning<<"Unable to queue notification of domain '"<<domain<<"': nameservers do not resolve!"<<endl; 
    163     for(vector<string>::const_iterator k=nsips.begin();k!=nsips.end();++k) 
    164       ips.insert(*k); 
    165   } 
    166    
    167   // make calls to d_nq.add(domain, ip); 
    168   for(set<string>::const_iterator j=ips.begin();j!=ips.end();++j) { 
    169     L<<Logger::Warning<<"Queued notification of domain '"<<domain<<"' to "<<*j<<endl; 
    170     d_nq.add(domain,*j); 
    171   } 
    172    
    173   set<string>alsoNotify; 
    174   B->alsoNotifies(domain, &alsoNotify); 
    175    
    176   for(set<string>::const_iterator j=alsoNotify.begin();j!=alsoNotify.end();++j) { 
    177     L<<Logger::Warning<<"Queued also-notification of domain '"<<domain<<"' to "<<*j<<endl; 
    178     d_nq.add(domain,*j); 
    179   } 
    180 } 
    181  
    182 bool CommunicatorClass::notifyDomain(const string &domain) 
    183 { 
    184   DomainInfo di; 
    185   PacketHandler P; 
    186   if(!P.getBackend()->getDomainInfo(domain, di)) { 
    187     L<<Logger::Error<<"No such domain '"<<domain<<"' in our database"<<endl; 
    188     return false; 
    189   } 
    190   queueNotifyDomain(domain, P.getBackend()); 
    191   // call backend and tell them we sent out the notification - even though that is premature     
    192   di.backend->setNotified(di.id, di.serial); 
    193  
    194   return true;  
    195 } 
    196  
    197  
    198 void CommunicatorClass::masterUpdateCheck(PacketHandler *P) 
    199 { 
    200   if(!::arg().mustDo("master")) 
    201     return;  
    202  
    203   UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend()); 
    204   vector<DomainInfo> cmdomains; 
    205   B->getUpdatedMasters(&cmdomains); 
    206    
    207   if(cmdomains.empty()) { 
    208     if(d_masterschanged) 
    209       L<<Logger::Warning<<"No master domains need notifications"<<endl; 
    210     d_masterschanged=false; 
    211   } 
    212   else { 
    213     d_masterschanged=true; 
    214     L<<Logger::Error<<cmdomains.size()<<" domain"<<(cmdomains.size()>1 ? "s" : "")<<" for which we are master need"<< 
    215       (cmdomains.size()>1 ? "" : "s")<< 
    216       " notifications"<<endl; 
    217   } 
    218  
    219   // figure out A records of everybody needing notification 
    220   // do this via the FindNS class, d_fns 
    221    
    222   for(vector<DomainInfo>::const_iterator i=cmdomains.begin();i!=cmdomains.end();++i) { 
    223     extern PacketCache PC; 
    224     vector<string> topurge; 
    225     topurge.push_back(i->zone); 
    226     PC.purge(topurge); // fixes cvstrac ticket #30 
    227     queueNotifyDomain(i->zone,P->getBackend()); 
    228     i->backend->setNotified(i->id,i->serial);  
    229   } 
    230 } 
    231  
    232 void CommunicatorClass::slaveRefresh(PacketHandler *P) 
    233 { 
    234   UeberBackend *B=dynamic_cast<UeberBackend *>(P->getBackend()); 
    235   vector<DomainInfo> sdomains; 
    236   B->getUnfreshSlaveInfos(&sdomains); 
    237    
    238   if(sdomains.empty()) 
    239   { 
    240     if(d_slaveschanged) 
    241       L<<Logger::Warning<<"All slave domains are fresh"<<endl; 
    242     d_slaveschanged=false; 
    243     return; 
    244   } 
    245   else  
    246     L<<Logger::Warning<<sdomains.size()<<" slave domain"<<(sdomains.size()>1 ? "s" : "")<<" need"<< 
    247       (sdomains.size()>1 ? "" : "s")<< 
    248       " checking"<<endl; 
    249   map<string, int> skipMasters; 
    250   for(vector<DomainInfo>::iterator i=sdomains.begin();i!=sdomains.end();++i) { 
    251     Resolver resolver;    
    252     resolver.makeUDPSocket();   
    253     d_slaveschanged=true; 
    254     uint32_t ourserial=i->serial, theirserial=0; 
    255      
    256     if(d_havepriosuckrequest) { 
    257       d_havepriosuckrequest=false; 
    258       break; 
    259     } 
    260  
    261     random_shuffle(i->masters.begin(), i->masters.end()); 
    262     for(vector<string>::const_iterator iter = i->masters.begin(); iter != i->masters.end(); ++iter) { 
    263       try { 
    264         if(skipMasters[*iter] > 5) 
    265           throw AhuException("Skipping query to '"+*iter+"' because of previous timeouts in this cycle"); 
    266          
    267         resolver.getSoaSerial(*iter, i->zone, &theirserial); 
    268         skipMasters[*iter]=0;    
    269         if(theirserial<i->serial) { 
    270           L<<Logger::Error<<"Domain "<<i->zone<<" more recent than master, our serial "<<ourserial<<" > their serial "<<theirserial<<endl; 
    271           i->backend->setFresh(i->id); 
    272         } 
    273         else if(theirserial==i->serial) { 
    274           L<<Logger::Warning<<"Domain "<<i->zone<<" is fresh"<<endl; 
    275           i->backend->setFresh(i->id); 
    276         } 
    277         else { 
    278           L<<Logger::Warning<<"Domain "<<i->zone<<" is stale, master serial "<<theirserial<<", our serial "<<i->serial<<endl; 
    279           addSuckRequest(i->zone, *iter); 
    280         } 
    281         break; 
    282       } 
    283       catch(ResolverException &re) { 
    284         if(re.reason.find("Timeout") != string::npos) 
    285           skipMasters[*iter]++; 
    286  
    287         L<<Logger::Error<<"Error trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl; 
    288         if(next(iter) != i->masters.end())  
    289           L<<Logger::Error<<"Trying next master '"<<*next(iter)<<"' for '"+i->zone+"'"<<endl; 
    290       } 
    291       catch(AhuException &re) { 
    292         L<<Logger::Error<<"Error trying to retrieve/refresh '"+i->zone+"': "+re.reason<<endl; 
    293         if(next(iter) != i->masters.end())  
    294           L<<Logger::Error<<"Trying next master '"<<*next(iter)<<"' for '"+i->zone+"'"<<endl; 
    295       } 
    296     } 
    297   } 
    298  
    299  
    300 time_t CommunicatorClass::doNotifications() 
    301 { 
    302   ComboAddress from; 
    303   Utility::socklen_t fromlen=sizeof(from); 
    304   char buffer[1500]; 
    305   int size; 
    306   static Resolver d_nresolver; 
    307   // receive incoming notifications on the nonblocking socket and take them off the list 
    308  
    309   while((size=recvfrom(d_nsock,buffer,sizeof(buffer),0,(struct sockaddr *)&from,&fromlen))>0) { 
    310     DNSPacket p; 
    311  
    312     p.setRemote(&from); 
    313  
    314     if(p.parse(buffer,size)<0) { 
    315       L<<Logger::Warning<<"Unable to parse SOA notification answer from "<<p.getRemote()<<endl; 
    316       continue; 
    317     } 
    318  
    319     if(p.d.rcode) 
    320       L<<Logger::Warning<<"Received unsuccesful notification report for '"<<p.qdomain<<"' from "<<p.getRemote()<<", rcode: "<<p.d.rcode<<endl;       
    321      
    322     if(d_nq.removeIf(p.getRemote(), p.d.id, p.qdomain)) 
    323       L<<Logger::Warning<<"Removed from notification list: '"<<p.qdomain<<"' to "<<p.getRemote()<< (p.d.rcode ? "" : " (was acknowledged)")<<endl;       
    324     else 
    325       L<<Logger::Warning<<"Received spurious notify answer for '"<<p.qdomain<<"' from "<<p.getRemote()<<endl;       
    326   } 
    327  
    328   // send out possible new notifications 
    329   string domain, ip; 
    330   uint16_t id; 
    331  
    332   bool purged; 
    333   while(d_nq.getOne(domain, ip, &id, purged)) { 
    334     if(!purged) { 
    335       try { 
    336         d_nresolver.notify(d_nsock, domain, ip, id); 
    337         drillHole(domain, ip); 
    338       } 
    339       catch(ResolverException &re) { 
    340         L<<Logger::Error<<"Error trying to resolve '"+ip+"' for notifying '"+domain+"' to server: "+re.reason<<endl; 
    341       } 
    342     } 
    343     else 
    344       L<<Logger::Error<<Logger::NTLog<<"Notification for "<<domain<<" to "<<ip<<" failed after retries"<<endl; 
    345   } 
    346  
    347   return d_nq.earliest(); 
    348 } 
    349  
    350 void CommunicatorClass::drillHole(const string &domain, const string &ip) 
    351 { 
    352   Lock l(&d_holelock); 
    353   d_holes[make_pair(domain,ip)]=time(0); 
    354 } 
    355  
    356 bool CommunicatorClass::justNotified(const string &domain, const string &ip) 
    357 { 
    358   Lock l(&d_holelock); 
    359   if(d_holes.find(make_pair(domain,ip))==d_holes.end()) // no hole 
    360     return false; 
    361  
    362   if(d_holes[make_pair(domain,ip)]>time(0)-900)    // recent hole 
    363     return true; 
    364  
    365   // do we want to purge this? XXX FIXME  
    366   return false; 
    367 } 
    368  
    369 void CommunicatorClass::makeNotifySocket() 
    370 { 
    371   if((d_nsock=socket(AF_INET, SOCK_DGRAM,0))<0) 
    372     throw AhuException(string("notification socket: ")+strerror(errno)); 
    373  
    374   struct sockaddr_in sin; 
    375   memset((char *)&sin,0, sizeof(sin)); 
    376    
    377   sin.sin_family = AF_INET; 
    378  
    379   // Bind to a specific IP (query-local-address) if specified 
    380   string querylocaladdress(::arg()["query-local-address"]); 
    381   if (querylocaladdress=="") { 
    382     sin.sin_addr.s_addr = INADDR_ANY; 
    383   } 
    384   else 
    385   { 
    386     struct hostent *h=0; 
    387     h=gethostbyname(querylocaladdress.c_str()); 
    388     if(!h) { 
    389       Utility::closesocket(d_nsock); 
    390       d_nsock=-1;        
    391       throw AhuException("Unable to resolve query local address"); 
    392     } 
    393  
    394     sin.sin_addr.s_addr = *(int*)h->h_addr; 
    395   } 
    396    
    397   int n=0; 
    398   for(;n<10;n++) { 
    399     sin.sin_port = htons(10000+(Utility::random()%50000)); 
    400      
    401     if(::bind(d_nsock, (struct sockaddr *)&sin, sizeof(sin)) >= 0)  
    402       break; 
    403   } 
    404   if(n==10) { 
    405     Utility::closesocket(d_nsock); 
    406     d_nsock=-1; 
    407     throw AhuException(string("binding notify socket: ")+strerror(errno)); 
    408   } 
    409   if( !Utility::setNonBlocking( d_nsock )) 
    410     throw AhuException(string("error getting or setting notify socket non-blocking: ")+strerror(errno)); 
    411  
    412 } 
    413  
    414 void CommunicatorClass::notify(const string &domain, const string &ip) 
    415 { 
    416   d_nq.add(domain, ip); 
    417  
    418   d_any_sem.post(); 
    419 } 
    42037 
    42138void CommunicatorClass::mainloop(void) 
     
    43855      slaveRefresh(&P); 
    43956      masterUpdateCheck(&P); 
    440  
    441       tick=min(doNotifications(), 
    442                d_tickinterval); 
     57      tick=doNotifications(); 
     58       
     59      tick = min (tick, d_tickinterval);  
    44360 
    44461      //      L<<Logger::Error<<"tick = "<<tick<<", d_tickinterval = "<<d_tickinterval<<endl; 
     
    44663 
    44764      while(time(0) < next) { 
    448         rc=d_any_sem.tryWait(); 
     65        rc=d_any_sem.tryWait(); 
    44966 
    450         if(rc) 
    451           Utility::sleep(1); 
    452         else {  
    453           if(!d_suck_sem.tryWait()) { 
    454             SuckRequest sr; 
    455             { 
    456               Lock l(&d_lock); 
    457               sr=d_suckdomains.front(); 
    458               d_suckdomains.pop_front(); 
    459             } 
    460             suck(sr.domain,sr.master); 
    461           } 
    462         } 
    463         // this gets executed at least once every second 
    464         doNotifications(); 
     67        if(rc) 
     68          Utility::sleep(1); 
     69        else {  
     70          if(!d_suck_sem.tryWait()) { 
     71            SuckRequest sr; 
     72            { 
     73              Lock l(&d_lock); 
     74              sr=d_suckdomains.front(); 
     75              d_suckdomains.pop_front(); 
     76            } 
     77            suck(sr.domain,sr.master); 
     78          } 
     79        } 
     80        // this gets executed at least once every second 
     81        doNotifications(); 
    46582      } 
    46683    } 
  • tags/pdns-2-9-22-x/pdns/communicator.hh

    r1105 r1658  
    6767 
    6868      if(i->id==id && i->ip==remote && i->domain==domain) { 
    69         d_nqueue.erase(i); 
    70         return true; 
     69        d_nqueue.erase(i); 
     70        return true; 
    7171      } 
    7272    } 
     
    7878    for(d_nqueue_t::iterator i=d_nqueue.begin();i!=d_nqueue.end();++i)  
    7979      if(i->next <= time(0)) { 
    80         i->attempts++; 
    81         purged=false; 
    82         i->next=time(0)+1+(1<<i->attempts); 
    83         domain=i->domain; 
    84         ip=i->ip; 
    85         *id=i->id; 
    86         purged=false; 
    87         if(i->attempts>4) { 
    88           purged=true; 
    89           d_nqueue.erase(i); 
    90         } 
    91         return true; 
     80        i->attempts++; 
     81        purged=false; 
     82        i->next=time(0)+1+(1<<i->attempts); 
     83        domain=i->domain; 
     84        ip=i->ip; 
     85        *id=i->id; 
     86        purged=false; 
     87        if(i->attempts>4) { 
     88          purged=true; 
     89          d_nqueue.erase(i); 
     90        } 
     91        return true; 
    9292      } 
    9393    return false; 
  • tags/pdns-2-9-22-x/pdns/dnsrecords.hh

    r1302 r1658  
    322322  includeboilerplate(SOA) 
    323323  SOARecordContent(const string& mname, const string& rname, const struct soatimes& st); 
     324  struct soatimes d_st; 
    324325 
    325326private: 
    326327  string d_mname; 
    327328  string d_rname; 
    328   struct soatimes d_st; 
     329 
    329330}; 
    330331 
  • tags/pdns-2-9-22-x/pdns/misc.hh

    r1260 r1658  
    304304  return tv.tv_sec + tv.tv_usec/1000000.0f; 
    305305} 
     306inline bool operator<(const struct timeval& lhs, const struct timeval& rhs)  
     307{ 
     308  return make_pair(lhs.tv_sec, lhs.tv_usec) < make_pair(rhs.tv_sec, rhs.tv_usec); 
     309} 
     310 
     311 
    306312struct CIStringCompare: public binary_function<string, string, bool>   
    307313{ 
  • tags/pdns-2-9-22-x/pdns/resolver.cc

    r1278 r1658  
    11/* 
    22    PowerDNS Versatile Database Driven Nameserver 
    3     Copyright (C) 2002 - 2008 PowerDNS.COM BV 
     3    Copyright (C) 2002 - 2009 PowerDNS.COM BV 
    44 
    55    This program is free software; you can redistribute it and/or modify 
     
    2626#include <algorithm> 
    2727#include <sstream> 
     28#include "dnsrecords.hh" 
    2829#include <cstring> 
    2930#include <string> 
     
    4142#include "dns_random.hh" 
    4243 
    43 using namespace boost; 
     44// #include "namespaces.hh" 
    4445 
    4546void Resolver::makeUDPSocket() 
     
    114115} 
    115116 
    116 char* Resolver::sendReceive(const string &ip, uint16_t remotePort, const char *packet, int length, unsigned int *replen) 
    117 { 
    118   makeTCPSocket(ip, remotePort); 
    119  
    120   if(sendData(packet,length,d_sock)<0)  
    121     throw ResolverException("Unable to send packet to remote nameserver "+ip+": "+stringerror()); 
    122  
    123   int plen=getLength(); 
    124   if(plen<0) 
    125     throw ResolverException("EOF trying to get length of answer from remote TCP server"); 
    126  
    127   char *answer=new char[plen]; 
    128   try { 
    129     timeoutReadn(answer,plen); 
    130     *replen=plen; 
    131     return answer; 
    132   } 
    133   catch(...) { 
    134     delete answer; 
    135     throw; // whop! 
    136   } 
    137   return 0; 
    138 } 
    139  
    140117int Resolver::notify(int sock, const string &domain, const string &ip, uint16_t id) 
    141118{ 
     
    152129} 
    153130 
    154 void Resolver::sendResolve(const string &ip, const char *domain, int type) 
     131uint16_t Resolver::sendResolve(const string &ip, const char *domain, int type) 
    155132{ 
    156133  vector<uint8_t> packet; 
     
    176153    throw ResolverException("Unable to ask query of "+st.host+":"+itoa(st.port)+": "+stringerror()); 
    177154  } 
     155  return d_randomid; 
     156} 
     157 
     158bool Resolver::tryGetSOASerial(string* domain, uint32_t *theirSerial, uint16_t* id) 
     159{ 
     160  Utility::setNonBlocking( d_sock ); 
     161   
     162  if(waitForData(d_sock, 0, 500000) == 0) 
     163    return false; 
     164   
     165  int err; 
     166  ComboAddress fromaddr; 
     167  socklen_t addrlen=fromaddr.getSocklen(); 
     168  err = recvfrom(d_sock, reinterpret_cast< char * >( d_buf ), 512, 0,(struct sockaddr*)(&fromaddr), &addrlen); 
     169  if(err < 0) { 
     170    if(errno == EAGAIN) 
     171      return false; 
     172     
     173    throw ResolverException("recvfrom error waiting for answer: "+stringerror()); 
     174  } 
     175   
     176  MOADNSParser mdp((char*)d_buf, err); 
     177  *id=mdp.d_header.id; 
     178  *domain = stripDot(mdp.d_qname); 
     179   
     180  if(mdp.d_answers.empty()) 
     181    throw ResolverException("Query to '" + fromaddr.toString() + "' for SOA of '" + *domain + "' produced no results"); 
     182   
     183  if(mdp.d_qtype != QType::SOA || mdp.d_answers.begin()->first.d_type != QType::SOA)  
     184    throw ResolverException("Query to '" + fromaddr.toString() + "' for SOA of '" + *domain + "' returned wrong record type"); 
     185 
     186  shared_ptr<SOARecordContent> rrc=boost::dynamic_pointer_cast<SOARecordContent>(mdp.d_answers.begin()->first.d_content); 
     187 
     188  *theirSerial=rrc->d_st.serial; 
     189   
     190   
     191  return true; 
    178192} 
    179193 
    180194int Resolver::receiveResolve(struct sockaddr* fromaddr, Utility::socklen_t addrlen) 
    181195{ 
    182   fd_set rd; 
    183   FD_ZERO(&rd); 
    184   FD_SET(d_sock, &rd); 
    185  
    186   struct timeval timeout; 
    187   timeout.tv_sec=0; 
    188   timeout.tv_usec=750000; 
    189  
    190   int res=select(d_sock+1,&rd,0,0,&timeout); 
    191  
     196  int res=waitForData(d_sock, 0, 7500000);  
     197   
    192198  if(!res) { 
    193199    throw ResolverException("Timeout waiting for answer"); 
     
    257263    goto done; 
    258264 
    259   fd_set rset,wset; 
    260   struct timeval tval; 
    261  
    262   FD_ZERO(&rset); 
    263   FD_SET(d_sock, &rset); 
    264   wset=rset; 
    265   tval.tv_sec=10; 
    266   tval.tv_usec=0; 
    267  
    268   if(!select(d_sock+1,&rset,&wset,0,tval.tv_sec ? &tval : 0)) { 
     265  err=waitForRWData(d_sock, false, 10, 0); // wait for writeability 
     266   
     267  if(!err) { 
    269268    Utility::closesocket(d_sock); // timeout 
    270269    d_sock=-1; 
     
    273272    throw ResolverException("Timeout connecting to server"); 
    274273  } 
    275    
    276   if(FD_ISSET(d_sock, &rset) || FD_ISSET(d_sock, &wset)) 
    277     { 
     274  else if(err < 0) { 
     275    throw ResolverException("Error connecting: "+string(strerror(err))); 
     276  } 
     277  else { 
    278278    Utility::socklen_t len=sizeof(err); 
    279       if(getsockopt(d_sock, SOL_SOCKET,SO_ERROR,(char *)&err,&len)<0) 
    280         throw ResolverException("Error connecting: "+stringerror()); // Solaris 
    281  
    282       if(err) 
    283         throw ResolverException("Error connecting: "+string(strerror(err))); 
    284  
    285     } 
    286   else 
    287     throw ResolverException("nonblocking connect failed"); 
    288  
     279    if(getsockopt(d_sock, SOL_SOCKET,SO_ERROR,(char *)&err,&len)<0) 
     280      throw ResolverException("Error connecting: "+stringerror()); // Solaris 
     281 
     282    if(err) 
     283      throw ResolverException("Error connecting: "+string(strerror(err))); 
     284  } 
     285   
    289286 done: 
    290287  Utility::setBlocking( d_sock ); 
     
    316313    throw ResolverException("Error sending question to "+ip+": "+stringerror()); 
    317314 
    318   fd_set rd; 
    319   FD_ZERO(&rd); 
    320   FD_SET(d_sock, &rd); 
    321  
    322   struct timeval timeout; 
    323   timeout.tv_sec=10; 
    324   timeout.tv_usec=0; 
    325  
    326   int res=select(d_sock+1,&rd,0,0,&timeout); 
     315  int res = waitForData(d_sock, 10, 0); 
     316   
    327317  if(!res) 
    328318    throw ResolverException("Timeout waiting for answer from "+ip+" during AXFR"); 
     
    408398    if(!d_inaxfr) { 
    409399      if(mdp->d_header.qdcount!=1) 
    410         throw ResolverException("resolver: received answer with wrong number of questions ("+itoa(mdp->d_header.qdcount)+")"); 
     400        throw ResolverException("resolver: received answer with wrong number of questions ("+itoa(mdp->d_header.qdcount)+")"); 
    411401       
    412402      if(mdp->d_qname != d_domain+".") 
    413         throw ResolverException(string("resolver: received an answer to another question (")+mdp->d_qname+"!="+d_domain+".)"); 
     403        throw ResolverException(string("resolver: received an answer to another question (")+mdp->d_qname+"!="+d_domain+".)"); 
    414404    } 
    415405     
     
    419409      rr.qname = i->first.d_label; 
    420410      if(!rr.qname.empty()) 
    421         erase_tail(rr.qname, 1); // strip . 
     411        boost::erase_tail(rr.qname, 1); // strip . 
    422412      rr.qtype = i->first.d_type; 
    423413      rr.ttl = i->first.d_ttl; 
    424414      rr.content = i->first.d_content->getZoneRepresentation(); 
     415      rr.priority = 0; 
    425416       
    426417      uint16_t qtype=rr.qtype.getCode(); 
    427418 
    428419      if(!rr.content.empty() && (qtype==QType::MX || qtype==QType::NS || qtype==QType::CNAME)) 
    429         erase_tail(rr.content, 1); 
     420        boost::erase_tail(rr.content, 1); 
    430421 
    431422      if(rr.qtype.getCode() == QType::MX) { 
    432         vector<string> parts; 
    433         stringtok(parts, rr.content); 
    434         rr.priority = atoi(parts[0].c_str()); 
    435         if(parts.size() > 1) 
    436           rr.content=parts[1]; 
     423        vector<string> parts; 
     424        stringtok(parts, rr.content); 
     425        rr.priority = atoi(parts[0].c_str()); 
     426        if(parts.size() > 1) 
     427          rr.content=parts[1]; 
    437428      } else if(rr.qtype.getCode() == QType::SRV) { 
    438         rr.priority = atoi(rr.content.c_str()); 
    439         vector<pair<string::size_type, string::size_type> > fields; 
    440         vstringtok(fields, rr.content, " "); 
    441         if(fields.size()==4) 
    442           rr.content=string(rr.content.c_str() + fields[1].first, fields[3].second - fields[1].first); 
     429        rr.priority = atoi(rr.content.c_str()); 
     430        vector<pair<string::size_type, string::size_type> > fields; 
     431        vstringtok(fields, rr.content, " "); 
     432        if(fields.size()==4) 
     433          rr.content=string(rr.content.c_str() + fields[1].first, fields[3].second - fields[1].first); 
    443434      } 
    444435      ret.push_back(rr); 
  • tags/pdns-2-9-22-x/pdns/resolver.hh

    r1015 r1658  
    5858  int notify(int sock, const string &domain, const string &ip, uint16_t id); 
    5959  int resolve(const string &ip, const char *domain, int type); 
    60   void sendResolve(const string &ip, const char *domain, int type); 
     60  uint16_t sendResolve(const string &ip, const char *domain, int type); 
     61  bool tryGetSOASerial(string* theirDomain, uint32_t* theirSerial, uint16_t* id); 
    6162 
    6263  int receiveResolve(struct sockaddr* fromaddr, Utility::socklen_t addrlen); 
    63   char* sendReceive(const string &ip, uint16_t remotePort, const char *packet, int length, unsigned int *replylen); 
     64   
    6465  void getSoaSerial(const string &, const string &, uint32_t *); 
    6566  int axfrChunk(Resolver::res_t &res);