Changeset 1316

Show
Ignore:
Timestamp:
11/27/08 23:26:42 (21 months ago)
Author:
ahu
Message:

multithread the packet receiver - for great quad core justice

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • trunk/pdns/pdns/common_startup.cc

    r1285 r1316  
    6565  ::arg().set("default-soa-name","name to insert in the SOA record if none set in the backend")="a.misconfigured.powerdns.server"; 
    6666  ::arg().set("distributor-threads","Default number of Distributor (backend) threads to start")="3"; 
     67  ::arg().set("receiver-threads","Default number of Distributor (backend) threads to start")="1"; 
    6768  ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500";  
    6869  ::arg().set("recursor","If recursion is desired, IP address of a recursing nameserver")="no";  
     
    203204} 
    204205 
     206static DNSDistributor* g_distributor; 
     207static pthread_mutex_t d_distributorlock =PTHREAD_MUTEX_INITIALIZER; 
     208static bool g_mustlockdistributor; 
    205209 
    206210//! The qthread receives questions over the internet via the Nameserver class, and hands them to the Distributor for futher processing 
    207 void *qthread(void *p) 
    208 { 
    209   DNSDistributor *D=static_cast<DNSDistributor *>(p); 
    210  
     211void *qthread(void *number) 
     212{ 
    211213  DNSPacket *P; 
    212214 
     
    226228 
    227229  for(;;) { 
    228     if(!((numreceived++)%50)) { // maintenance tasks 
    229       S.set("latency",(int)avg_latency); 
    230       int qcount, acount; 
    231       D->getQueueSizes(qcount, acount); 
    232       S.set("qsize-q",qcount); 
     230    if(number==0) { 
     231      if(!((numreceived++)%50)) { // maintenance tasks 
     232        S.set("latency",(int)avg_latency); 
     233        int qcount, acount; 
     234        g_distributor->getQueueSizes(qcount, acount); 
     235        S.set("qsize-q",qcount); 
     236      } 
    233237    } 
    234      
     238 
    235239    if(!(P=N->receive(&question))) { // receive a packet         inline 
    236240      continue;                    // packet was broken, try again 
     
    265269      continue; 
    266270    } 
    267  
    268     D->question(P, &sendout); // otherwise, give to the distributor 
     271    if(g_mustlockdistributor) { 
     272      Lock l(&d_distributorlock); 
     273      g_distributor->question(P, &sendout); // otherwise, give to the distributor 
     274    } 
     275    else 
     276      g_distributor->question(P, &sendout); // otherwise, give to the distributor 
    269277  } 
    270278  return 0; 
     
    319327     
    320328  //  fork(); (this worked :-)) 
    321   DNSDistributor *D= new DNSDistributor(::arg().asNum("distributor-threads")); // the big dispatcher! 
    322   pthread_create(&qtid,0,qthread,static_cast<void *>(D)); // receives packets 
     329  g_distributor = new DNSDistributor(::arg().asNum("distributor-threads")); // the big dispatcher! 
     330  if(::arg().asNum("receiver-threads") > 1) { 
     331    g_mustlockdistributor=true; 
     332  } 
     333  for(int n=0; n < ::arg().asNum("receiver-threads"); ++n) 
     334    pthread_create(&qtid,0,qthread, reinterpret_cast<void *>(n)); // receives packets 
    323335 
    324336  void *p;