Show
Ignore:
Timestamp:
08/08/10 21:59:19 (3 years ago)
Author:
ahu
Message:

implement pdns-distributes-queries to make powerdns distribute queries itself
implement 'processes', bringing back the old-school '--fork' option

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/pdns/pdns/pdns_recursor.cc

    r1678 r1684  
    100100bool g_quiet; 
    101101 
     102bool g_weDistributeQueries; // if true, only 1 thread listens on the incoming query sockets 
     103 
    102104static __thread NetmaskGroup* t_allowFrom; 
    103105static NetmaskGroup* g_initialAllowFrom; // new thread needs to be setup with this 
     
    811813} 
    812814  
     815string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, int fd) 
     816{ 
     817  ++g_stats.qcounter; 
     818 
     819  string response; 
     820  try { 
     821    uint32_t age; 
     822    if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(question, g_now.tv_sec, &response, &age)) { 
     823      if(!g_quiet) 
     824        L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl; 
     825 
     826      g_stats.packetCacheHits++; 
     827      SyncRes::s_queries++; 
     828      ageDNSPacket(response, age); 
     829      sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen()); 
     830      if(response.length() >= sizeof(struct dnsheader)) 
     831        updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); 
     832      g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec 
     833      return 0; 
     834    } 
     835  }  
     836  catch(std::exception& e) { 
     837    L<<Logger::Error<<"Error processing or aging answer packet: "<<e.what()<<endl; 
     838    return 0; 
     839  } 
     840   
     841   
     842  if(MT->numProcesses() > g_maxMThreads) { 
     843    g_stats.overCapacityDrops++; 
     844    return 0; 
     845  } 
     846   
     847  DNSComboWriter* dc = new DNSComboWriter(question.c_str(), question.size(), g_now); 
     848  dc->setSocket(fd); 
     849  dc->setRemote(&fromaddr); 
     850 
     851  dc->d_tcp=false; 
     852  MT->makeThread(startDoResolve, (void*) dc); // deletes dc 
     853  return 0; 
     854}  
     855  
    813856void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) 
    814857{ 
     
    836879      } 
    837880      else { 
    838         ++g_stats.qcounter; 
    839  
    840         string response; 
    841         try { 
    842           uint32_t age; 
    843           if(!SyncRes::s_nopacketcache && t_packetCache->getResponsePacket(string(data, len), g_now.tv_sec, &response, &age)) { 
    844             if(!g_quiet) 
    845               L<<Logger::Error<<t_id<< " question answered from packet cache from "<<fromaddr.toString()<<endl; 
    846    
    847             g_stats.packetCacheHits++; 
    848             SyncRes::s_queries++; 
    849             ageDNSPacket(response, age); 
    850             sendto(fd, response.c_str(), response.length(), 0, (struct sockaddr*) &fromaddr, fromaddr.getSocklen()); 
    851             if(response.length() >= sizeof(struct dnsheader)) 
    852               updateRcodeStats(((struct dnsheader*)response.c_str())->rcode); 
    853             g_stats.avgLatencyUsec=(uint64_t)((1-0.0001)*g_stats.avgLatencyUsec + 0); // we assume 0 usec 
    854             return; 
    855           } 
    856         }  
    857         catch(std::exception& e) { 
    858           throw MOADNSException(e.what()); // translate 
    859         } 
    860         if(MT->numProcesses() > g_maxMThreads) { 
    861           g_stats.overCapacityDrops++; 
    862           return; 
    863         } 
    864    
    865         DNSComboWriter* dc = new DNSComboWriter(data, len, g_now); 
    866         dc->setSocket(fd); 
    867         dc->setRemote(&fromaddr); 
    868  
    869         dc->d_tcp=false; 
    870  
    871         MT->makeThread(startDoResolve, (void*) dc); // deletes dc 
     881        string question(data, len); 
     882        if(g_weDistributeQueries) 
     883          distributeAsyncFunction(boost::bind(doProcessUDPQuestion, question, fromaddr, fd)); 
     884        else 
     885          doProcessUDPQuestion(question, fromaddr, fd); 
    872886      } 
    873887    } 
     
    884898  } 
    885899} 
     900 
    886901 
    887902typedef vector<pair<int, function< void(int, any&) > > > deferredAdd_t; 
     
    11991214{ 
    12001215  static unsigned int counter; 
    1201   unsigned int target = ++counter % g_pipes.size(); 
     1216  unsigned int target = 1 + (++counter % (g_pipes.size()-1)); 
    12021217  // cerr<<"Sending to: "<<target<<endl; 
    12031218  if(target == t_id) { 
     
    16441659 
    16451660  g_quiet=::arg().mustDo("quiet"); 
     1661  g_weDistributeQueries = ::arg().mustDo("pdns-distributes-queries"); 
     1662  if(g_weDistributeQueries) { 
     1663      L<<Logger::Warning<<"PowerDNS Recursor itself will distribute queries over threads"<<endl; 
     1664  } 
     1665   
    16461666  if(::arg().mustDo("trace")) { 
    16471667    SyncRes::setLog(true); 
     
    16981718  makeTCPServerSockets(); 
    16991719 
     1720  for(int forks = 0; forks < ::arg().asNum("processes") - 1; ++forks) { 
     1721    if(!fork()) // we are child 
     1722      break; 
     1723  } 
     1724   
    17001725  s_pidfname=::arg()["socket-dir"]+"/"+s_programname+".pid"; 
    17011726  if(!s_pidfname.empty()) 
     
    17321757   
    17331758   
    1734   g_numThreads = ::arg().asNum("threads"); 
     1759  g_numThreads = ::arg().asNum("threads") + ::arg().mustDo("pdns-distributes-queries"); 
    17351760   
    17361761  makeThreadPipes(); 
     
    18051830  t_fdm->addReadFD(g_pipes[t_id].readToThread, handlePipeRequest); 
    18061831 
    1807   for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)  
    1808     t_fdm->addReadFD(i->first, i->second); 
     1832  if(!g_weDistributeQueries || !t_id)  // if we distribute queries, only t_id = 0 listens 
     1833    for(deferredAdd_t::const_iterator i=deferredAdd.begin(); i!=deferredAdd.end(); ++i)  
     1834      t_fdm->addReadFD(i->first, i->second); 
    18091835   
    18101836  if(!t_id) { 
    1811      
    18121837    t_fdm->addReadFD(s_rcc.d_fd, handleRCC); // control channel 
    18131838  } 
    1814    
     1839 
    18151840  unsigned int maxTcpClients=::arg().asNum("max-tcp-clients"); 
    18161841   
     
    19281953    ::arg().set("network-timeout", "Wait this nummer of milliseconds for network i/o")="1500"; 
    19291954    ::arg().set("threads", "Launch this number of threads")="2"; 
     1955    ::arg().set("processes", "Launch this number of processes (EXPERIMENTAL, DO NOT CHANGE)")="1"; 
    19301956#ifdef WIN32 
    19311957    ::arg().set("quiet","Suppress logging of questions and answers")="off"; 
     
    19832009    ::arg().setSwitch( "disable-edns-ping", "Disable EDNSPing" )= "no";  
    19842010    ::arg().setSwitch( "disable-edns", "Disable EDNS" )= "";  
    1985     ::arg().setSwitch( "disable-packetcache", "Disable packetcahe" )= "no";  
     2011    ::arg().setSwitch( "disable-packetcache", "Disable packetcache" )= "no";  
     2012    ::arg().setSwitch( "pdns-distributes-queries", "If PowerDNS itself should distribute queries over threads (EXPERIMENTAL)")="no"; 
     2013     
    19862014 
    19872015    ::arg().setCmd("help","Provide a helpful message");