Changeset 1960
- Timestamp:
- 02/02/11 00:12:40 (2 years ago)
- Location:
- trunk/pdns/pdns
- Files:
-
- 4 modified
-
common_startup.cc (modified) (1 diff)
-
signingpipe.cc (modified) (3 diffs)
-
signingpipe.hh (modified) (3 diffs)
-
tcpreceiver.cc (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/pdns/pdns/common_startup.cc
r1950 r1960 67 67 ::arg().set("default-soa-name","name to insert in the SOA record if none set in the backend")="a.misconfigured.powerdns.server"; 68 68 ::arg().set("distributor-threads","Default number of Distributor (backend) threads to start")="3"; 69 ::arg().set("signing-threads","Default number of signer threads to start")="3"; 69 70 ::arg().set("receiver-threads","Default number of Distributor (backend) threads to start")="1"; 70 71 ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500"; -
trunk/pdns/pdns/signingpipe.cc
r1959 r1960 1 1 #include "signingpipe.hh" 2 3 AtomicCounter ChunkedSigningPipe::s_workerid; 4 5 void* ChunkedSigningPipe::helperWorker(void* p) 6 { 7 ChunkedSigningPipe* us = (ChunkedSigningPipe*)p; 8 us->worker(); 9 return 0; 10 } 11 12 ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers) 13 : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers), 14 d_mustSign(mustSign) 15 { 16 if(!d_mustSign) 17 return; 18 if(pipe(d_uppipe) < 0 || pipe(d_backpipe)) 19 throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe"); 20 21 Utility::setNonBlocking(d_backpipe[0]); 22 for(unsigned int n=0; n < d_numworkers; ++n) { 23 pthread_create(&d_tids[n], 0, helperWorker, (void*) this); 24 } 25 } 26 27 ChunkedSigningPipe::~ChunkedSigningPipe() 28 { 29 if(!d_mustSign) 30 return; 31 close(d_uppipe[1]); // this will trigger all threads to exit 32 void* res; 33 for(unsigned int n = 0; n < d_numworkers; ++n) 34 pthread_join(d_tids[n], &res); 35 36 close(d_backpipe[1]); 37 close(d_backpipe[0]); 38 close(d_uppipe[0]); 39 } 2 40 3 41 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr) … … 11 49 } 12 50 51 void ChunkedSigningPipe::sendChunkToSign() 52 { 53 if(!d_mustSign) { 54 copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk)); 55 d_toSign.clear(); 56 return; 57 } 58 if(!d_toSign.empty()) { 59 chunk_t* toSign = new chunk_t(d_toSign); 60 61 if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign)) 62 throw runtime_error("Partial write or error communicating to signing thread"); 63 d_outstanding++; 64 } 65 chunk_t* signedChunk; 66 67 while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) { 68 --d_outstanding; 69 copy(signedChunk->begin(), signedChunk->end(), back_inserter(d_chunk)); 70 delete signedChunk; 71 } 72 73 d_toSign.clear(); 74 } 75 76 void ChunkedSigningPipe::worker() 77 { 78 //int my_id = ++s_workerid; 79 // cout<<my_id<<" worker reporting!"<<endl; 80 chunk_t* chunk; 81 82 DNSSECKeeper dk; 83 int res; 84 for(;;) { 85 res=read(d_uppipe[0], &chunk, sizeof(chunk)); 86 if(!res) { 87 // cerr<<my_id<<" exiting"<<endl; 88 break; 89 } 90 if(res != sizeof(chunk)) 91 unixDie("error or partial read from ChunkedSigningPipe main thread"); 92 // cout<< my_id <<" worker signing!"<<endl; 93 addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved 94 if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk)) 95 unixDie("error writing back to ChunkedSigningPipe"); 96 } 97 } 98 13 99 void ChunkedSigningPipe::flushToSign() 14 100 { 15 addRRSigs(d_dk, d_db, d_signer, d_toSign); // should start returning sigs separately instead of interleaved 16 copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk)); 101 sendChunkToSign(); 17 102 d_toSign.clear(); 18 103 } … … 20 105 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final) 21 106 { 22 if(final) 107 if(final) { 108 Utility::setBlocking(d_backpipe[0]); 23 109 flushToSign(); 24 110 } 25 111 26 112 chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size()); -
trunk/pdns/pdns/signingpipe.hh
r1959 r1960 2 2 #define PDNS_SIGNINGPIPE 3 3 #include <vector> 4 #include <pthread.h> 4 5 #include "dnsseckeeper.hh" 5 6 #include "dns.hh" … … 16 17 typedef vector<DNSResourceRecord> chunk_t; 17 18 18 ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName) : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100) {} 19 ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3); 20 ~ChunkedSigningPipe(); 19 21 bool submit(const DNSResourceRecord& rr); 20 22 chunk_t getChunk(bool final=false); 21 23 private: 22 24 void flushToSign(); 23 25 26 void sendChunkToSign(); // dispatch chunk to worker 27 void worker(); 28 29 static void* helperWorker(void* p); 24 30 chunk_t d_toSign, d_chunk; 25 31 DNSSECKeeper& d_dk; … … 27 33 string d_signer; 28 34 chunk_t::size_type d_chunkrecords; 35 36 int d_uppipe[2], d_backpipe[2]; 37 int d_outstanding; 38 unsigned int d_numworkers; 39 vector<pthread_t> d_tids; 40 static AtomicCounter s_workerid; 41 bool d_mustSign; 29 42 }; 30 43 -
trunk/pdns/pdns/tcpreceiver.cc
r1959 r1960 422 422 bool NSEC3Zone=false; 423 423 424 424 425 DNSSECKeeper dk; 426 bool securedZone = dk.isSecuredZone(target); 425 427 if(dk.getNSEC3PARAM(target, &ns3pr, &narrow)) { 426 428 NSEC3Zone=true; … … 470 472 } 471 473 472 473 474 if(!sd.db || sd.db==(DNSBackend *)-1) { 474 475 L<<Logger::Error<<"Error determining backend for domain '"<<target<<"' trying to serve an AXFR"<<endl; … … 488 489 489 490 UeberBackend signatureDB; 490 ChunkedSigningPipe csp(dk, signatureDB, target); 491 491 492 // SOA *must* go out first, our signing pipe might reorder 493 DLOG(L<<"Sending out SOA"<<endl); 492 494 DNSResourceRecord soa = makeDNSRRFromSOAData(sd); 493 csp.submit(soa); // an AXFR always starts with the SOA 494 DLOG(L<<"Sending out SOA"<<endl); 495 495 outpacket->addRecord(soa); 496 if(securedZone) 497 addRRSigs(dk, signatureDB, target, outpacket->getRRS()); 498 499 sendPacket(outpacket, outsock); 500 outpacket = getFreshAXFRPacket(q); 501 502 ChunkedSigningPipe csp(dk, signatureDB, target, securedZone, ::arg().asNum("signing-threads")); 496 503 497 504 typedef map<string, NSECXEntry, CanonicalCompare> nsecxrepo_t; … … 598 605 } 599 606 600 601 607 DLOG(L<<"Done writing out records"<<endl); 602 608 /* and terminate with yet again the SOA record */ 603 609 outpacket=getFreshAXFRPacket(q); 604 605 606 610 outpacket->addRecord(soa); 607 611 sendPacket(outpacket, outsock);