Changeset 1960

Show
Ignore:
Timestamp:
02/02/11 00:12:40 (2 years ago)
Author:
ahu
Message:

make the signingpipe multithreaded, achieving around 8000 RSASHA256/1024 signatures/s so far on an 8 core machine

Location:
trunk/pdns/pdns
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • trunk/pdns/pdns/common_startup.cc

    r1950 r1960  
    6767  ::arg().set("default-soa-name","name to insert in the SOA record if none set in the backend")="a.misconfigured.powerdns.server"; 
    6868  ::arg().set("distributor-threads","Default number of Distributor (backend) threads to start")="3"; 
     69  ::arg().set("signing-threads","Default number of signer threads to start")="3"; 
    6970  ::arg().set("receiver-threads","Default number of Distributor (backend) threads to start")="1"; 
    7071  ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500";  
  • trunk/pdns/pdns/signingpipe.cc

    r1959 r1960  
    11#include "signingpipe.hh" 
     2 
     3AtomicCounter ChunkedSigningPipe::s_workerid; 
     4 
     5void* ChunkedSigningPipe::helperWorker(void* p) 
     6{ 
     7  ChunkedSigningPipe* us = (ChunkedSigningPipe*)p; 
     8  us->worker(); 
     9  return 0; 
     10} 
     11 
     12ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers)  
     13  : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers), 
     14    d_mustSign(mustSign) 
     15{ 
     16  if(!d_mustSign) 
     17    return; 
     18  if(pipe(d_uppipe) < 0 || pipe(d_backpipe)) 
     19    throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe"); 
     20   
     21  Utility::setNonBlocking(d_backpipe[0]); 
     22  for(unsigned int n=0; n < d_numworkers; ++n) { 
     23    pthread_create(&d_tids[n], 0, helperWorker, (void*) this); 
     24  } 
     25} 
     26 
     27ChunkedSigningPipe::~ChunkedSigningPipe() 
     28{ 
     29  if(!d_mustSign) 
     30    return; 
     31  close(d_uppipe[1]); // this will trigger all threads to exit 
     32  void* res; 
     33  for(unsigned int n = 0; n < d_numworkers; ++n) 
     34    pthread_join(d_tids[n], &res); 
     35   
     36  close(d_backpipe[1]); 
     37  close(d_backpipe[0]); 
     38  close(d_uppipe[0]); 
     39} 
    240 
    341bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr) 
     
    1149} 
    1250 
     51void ChunkedSigningPipe::sendChunkToSign() 
     52{ 
     53  if(!d_mustSign) { 
     54    copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk)); 
     55    d_toSign.clear(); 
     56    return; 
     57  } 
     58  if(!d_toSign.empty()) { 
     59    chunk_t* toSign = new chunk_t(d_toSign); 
     60     
     61    if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign))  
     62      throw runtime_error("Partial write or error communicating to signing thread"); 
     63    d_outstanding++; 
     64  } 
     65  chunk_t* signedChunk; 
     66   
     67  while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) { 
     68    --d_outstanding; 
     69    copy(signedChunk->begin(), signedChunk->end(), back_inserter(d_chunk)); 
     70    delete signedChunk; 
     71  } 
     72   
     73  d_toSign.clear(); 
     74} 
     75 
     76void ChunkedSigningPipe::worker() 
     77{ 
     78  //int my_id = ++s_workerid; 
     79  // cout<<my_id<<" worker reporting!"<<endl; 
     80  chunk_t* chunk; 
     81   
     82  DNSSECKeeper dk; 
     83  int res; 
     84  for(;;) { 
     85    res=read(d_uppipe[0], &chunk, sizeof(chunk)); 
     86    if(!res) { 
     87      // cerr<<my_id<<" exiting"<<endl; 
     88      break; 
     89    } 
     90    if(res != sizeof(chunk)) 
     91      unixDie("error or partial read from ChunkedSigningPipe main thread"); 
     92    // cout<< my_id <<" worker signing!"<<endl; 
     93    addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved   
     94    if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk)) 
     95      unixDie("error writing back to ChunkedSigningPipe"); 
     96  } 
     97} 
     98 
    1399void ChunkedSigningPipe::flushToSign() 
    14100{ 
    15   addRRSigs(d_dk, d_db, d_signer, d_toSign); // should start returning sigs separately instead of interleaved 
    16   copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk)); 
     101  sendChunkToSign(); 
    17102  d_toSign.clear(); 
    18103} 
     
    20105vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final) 
    21106{ 
    22   if(final) 
     107  if(final) { 
     108    Utility::setBlocking(d_backpipe[0]); 
    23109    flushToSign(); 
    24    
     110  } 
    25111   
    26112  chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size()); 
  • trunk/pdns/pdns/signingpipe.hh

    r1959 r1960  
    22#define PDNS_SIGNINGPIPE 
    33#include <vector> 
     4#include <pthread.h> 
    45#include "dnsseckeeper.hh" 
    56#include "dns.hh" 
     
    1617  typedef vector<DNSResourceRecord> chunk_t;  
    1718   
    18   ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName) : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100) {} 
     19  ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3); 
     20  ~ChunkedSigningPipe(); 
    1921  bool submit(const DNSResourceRecord& rr); 
    2022  chunk_t getChunk(bool final=false); 
    2123private: 
    2224  void flushToSign();    
    23  
     25   
     26  void sendChunkToSign(); // dispatch chunk to worker 
     27  void worker(); 
     28   
     29  static void* helperWorker(void* p); 
    2430  chunk_t d_toSign, d_chunk; 
    2531  DNSSECKeeper& d_dk; 
     
    2733  string d_signer; 
    2834  chunk_t::size_type d_chunkrecords; 
     35   
     36  int d_uppipe[2], d_backpipe[2]; 
     37  int d_outstanding; 
     38  unsigned int d_numworkers; 
     39  vector<pthread_t> d_tids; 
     40  static AtomicCounter s_workerid; 
     41  bool d_mustSign; 
    2942}; 
    3043 
  • trunk/pdns/pdns/tcpreceiver.cc

    r1959 r1960  
    422422  bool NSEC3Zone=false; 
    423423   
     424   
    424425  DNSSECKeeper dk; 
     426  bool securedZone = dk.isSecuredZone(target); 
    425427  if(dk.getNSEC3PARAM(target, &ns3pr, &narrow)) { 
    426428    NSEC3Zone=true; 
     
    470472  } 
    471473 
    472  
    473474  if(!sd.db || sd.db==(DNSBackend *)-1) { 
    474475    L<<Logger::Error<<"Error determining backend for domain '"<<target<<"' trying to serve an AXFR"<<endl; 
     
    488489   
    489490  UeberBackend signatureDB;  
    490   ChunkedSigningPipe csp(dk, signatureDB, target); 
    491    
     491   
     492  // SOA *must* go out first, our signing pipe might reorder 
     493  DLOG(L<<"Sending out SOA"<<endl); 
    492494  DNSResourceRecord soa = makeDNSRRFromSOAData(sd); 
    493   csp.submit(soa); // an AXFR always starts with the SOA 
    494   DLOG(L<<"Sending out SOA"<<endl); 
    495    
     495  outpacket->addRecord(soa); 
     496  if(securedZone) 
     497    addRRSigs(dk, signatureDB, target, outpacket->getRRS()); 
     498   
     499  sendPacket(outpacket, outsock); 
     500  outpacket = getFreshAXFRPacket(q); 
     501   
     502  ChunkedSigningPipe csp(dk, signatureDB, target, securedZone, ::arg().asNum("signing-threads")); 
    496503   
    497504  typedef map<string, NSECXEntry, CanonicalCompare> nsecxrepo_t; 
     
    598605  } 
    599606   
    600    
    601607  DLOG(L<<"Done writing out records"<<endl); 
    602608  /* and terminate with yet again the SOA record */ 
    603609  outpacket=getFreshAXFRPacket(q); 
    604    
    605    
    606610  outpacket->addRecord(soa); 
    607611  sendPacket(outpacket, outsock);