Changeset 442

Show
Ignore:
Timestamp:
07/14/05 19:40:18 (8 years ago)
Author:
ahu
Message:

add titles to mthreads
fix confusion in mtasker when waiting for duplicated keys
clarified error handling in waiting for events
improve error checking in mtasker
fix dnsreplay timing fix

Location:
trunk/pdns/pdns
Files:
4 modified

Legend:

Unmodified
Added
Removed
  • trunk/pdns/pdns/dnsreplay.cc

    r441 r442  
    303303           
    304304          nanosleep(&tosleep, 0); 
     305          lastsent=pr.d_pheader.ts; 
    305306        } 
    306         lastsent=pr.d_pheader.ts; 
     307 
    307308        //      cout<<"sending!"<<endl; 
    308309        s_socket->sendTo(string(pr.d_payload, pr.d_payload + pr.d_len), remote); 
  • trunk/pdns/pdns/mtasker.cc

    r234 r442  
    167167  w.tid=d_tid; 
    168168   
     169  if(d_waiters.count(key)) { // there was already an exact same waiter 
     170    return -1; 
     171  } 
    169172  d_waiters[key]=w; 
    170173   
    171   swapcontext(d_waiters[key].context,&d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first 
     174  if(swapcontext(d_waiters[key].context,&d_kernel)) { // 'A' will return here when 'key' has arrived, hands over control to kernel first 
     175    perror("swapcontext"); 
     176    exit(EXIT_FAILURE); // no way we can deal with this  
     177  } 
    172178  if(val && d_waitstatus==Answer)  
    173179    *val=d_waitval; 
     
    182188{ 
    183189  d_runQueue.push(d_tid); 
    184   swapcontext(d_threads[d_tid],&d_kernel); // give control to the kernel 
     190  if(swapcontext(d_threads[d_tid].first ,&d_kernel) < 0) { // give control to the kernel 
     191    perror("swapcontext in  yield"); 
     192    exit(EXIT_FAILURE); 
     193  } 
    185194} 
    186195 
     
    194203{ 
    195204  if(!d_waiters.count(key)) { 
     205    //    cout<<"Event sent nobody was waiting for!"<<endl; 
    196206    return 0; 
    197207  } 
     
    205215   
    206216  d_waiters.erase(key);             // removes the waitpoint  
    207   swapcontext(&d_kernel,userspace); // swaps back to the above point 'A' 
    208    
     217  if(swapcontext(&d_kernel,userspace)) { // swaps back to the above point 'A' 
     218    perror("swapcontext in sendEvent"); 
     219    exit(EXIT_FAILURE); 
     220  } 
    209221  delete userspace; 
    210222  return 1; 
     
    216228    \param val A void pointer that can be used to pass data to the thread 
    217229*/ 
    218 template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val) 
     230template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val, const string& name) 
    219231{ 
    220232  ucontext_t *uc=new ucontext_t; 
     
    230242  makecontext (uc, (void (*)(void))threadWrapper, 4, this, start, d_maxtid, val); 
    231243#endif 
    232   d_threads[d_maxtid]=uc; 
     244  d_threads[d_maxtid]=make_pair(uc, name); 
    233245  d_runQueue.push(d_maxtid++); // will run at next schedule invocation 
    234246} 
     
    250262  if(!d_runQueue.empty()) { 
    251263    d_tid=d_runQueue.front(); 
    252     swapcontext(&d_kernel, d_threads[d_tid]); 
     264    if(swapcontext(&d_kernel, d_threads[d_tid].first)) { 
     265      perror("swapcontext in schedule"); 
     266      exit(EXIT_FAILURE); 
     267    } 
     268       
    253269    d_runQueue.pop(); 
    254270    return true; 
    255271  } 
    256272  if(!d_zombiesQueue.empty()) { 
    257     delete[] (char *)d_threads[d_zombiesQueue.front()]->uc_stack.ss_sp; 
    258     delete d_threads[d_zombiesQueue.front()]; 
     273    delete[] (char *)d_threads[d_zombiesQueue.front()].first->uc_stack.ss_sp; 
     274    delete d_threads[d_zombiesQueue.front()].first; 
    259275    d_threads.erase(d_zombiesQueue.front()); 
    260276    d_zombiesQueue.pop(); 
     
    266282      if(i->second.ttd && i->second.ttd<now) { 
    267283        d_waitstatus=TimeOut; 
    268         swapcontext(&d_kernel,i->second.context); // swaps back to the above point 'A' 
     284        if(swapcontext(&d_kernel,i->second.context)) { // swaps back to the above point 'A' 
     285          perror("swapcontext in schedule2"); 
     286          exit(EXIT_FAILURE); 
     287        } 
    269288        delete i->second.context;               
    270289        d_waiters.erase(i->first);                  // removes the waitpoint  
  • trunk/pdns/pdns/mtasker.hh

    r234 r442  
    5353  typedef std::map<EventKey,Waiter> waiters_t; 
    5454  waiters_t d_waiters; 
    55   std::map<int,ucontext_t*> d_threads; 
     55  typedef std::map<int,pair<ucontext_t*,string> > mthreads_t; 
     56  mthreads_t d_threads; 
    5657  int d_tid; 
    5758  int d_maxtid; 
     
    7778  int sendEvent(const EventKey& key, const EventVal* val=0); 
    7879  void getEvents(std::vector<EventKey>& events); 
    79   void makeThread(tfunc_t *start, void* val); 
     80  void makeThread(tfunc_t *start, void* val, const string& name=""); 
    8081  bool schedule(); 
    8182  bool noProcesses(); 
    8283  unsigned int numProcesses(); 
    8384  int getTid();  
     85  void setTitle(const string& name) 
     86  { 
     87    d_threads[d_tid].second=name; 
     88  } 
     89 
    8490private: 
    8591  static void threadWrapper(MTasker *self, tfunc_t *tf, int tid, void* val); 
  • trunk/pdns/pdns/pdns_recursor.cc

    r440 r442  
    123123  d_tcpclientwritesocks[sock->getHandle()]=pident; 
    124124 
    125   if(!MT->waitEvent(pident,&packet,1)) { // timeout 
     125  int ret=MT->waitEvent(pident,&packet,1); 
     126  if(!ret || ret==-1) { // timeout 
    126127    d_tcpclientwritesocks.erase(sock->getHandle()); 
    127     return 0;  
    128   } 
    129   //  cerr<<"asendtcp happy"<<endl; 
    130   return 1; 
    131 } 
    132  
     128  } 
     129  return ret; 
     130} 
     131 
     132// -1 is error, 0 is timeout, 1 is success 
    133133int arecvtcp(string& data, int len, Socket* sock)  
    134134{ 
     
    142142  d_tcpclientreadsocks[sock->getHandle()]=pident; 
    143143 
    144   if(!MT->waitEvent(pident,&data,1)) { // timeout 
     144  int ret=MT->waitEvent(pident,&data,1); 
     145  if(!ret || ret==-1) { // timeout 
    145146    d_tcpclientreadsocks.erase(sock->getHandle()); 
    146     return 0;  
    147   } 
    148  
    149   // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl; 
    150   return 1; 
     147  } 
     148  return ret; 
    151149} 
    152150 
    153151 
    154152/* these two functions are used by LWRes */ 
     153// -1 is error, > 1 is success 
    155154int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id)  
    156155{ 
     
    158157} 
    159158 
     159// -1 is error, 0 is timeout, 1 is success 
    160160int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id) 
    161161{ 
     
    165165 
    166166  string packet; 
    167   if(!MT->waitEvent(pident,&packet,1)) { // timeout 
    168     return 0;  
    169   } 
    170  
    171   *d_len=packet.size(); 
    172   memcpy(data,packet.c_str(),min(len,*d_len)); 
    173  
    174   return 1; 
     167  int ret=MT->waitEvent(pident,&packet,1); 
     168  if(ret > 0) { 
     169    *d_len=packet.size(); 
     170    memcpy(data,packet.c_str(),min(len,*d_len)); 
     171  } 
     172 
     173  return ret; 
    175174} 
    176175 
     
    221220 
    222221    delete (DNSPacket *)p; 
    223  
     222     
    224223    vector<DNSResourceRecord>ret; 
    225224    DNSPacket *R=P.replyPacket(); 
    226225    R->setA(false); 
    227226    R->setRA(true); 
    228  
     227    //    MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName()); 
    229228    SyncRes sr; 
    230229    if(!quiet) 
     
    274273    } 
    275274 
     275    //    MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName()); 
    276276    if(!quiet) { 
    277277      L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName(); 
     
    419419  statsWanted=true; 
    420420} 
     421 
     422void usr2Handler(int) 
     423{ 
     424  SyncRes::setLog(true); 
     425} 
     426 
    421427 
    422428 
     
    558564    } 
    559565    signal(SIGUSR1,usr1Handler); 
     566    signal(SIGUSR2,usr2Handler); 
    560567 
    561568    writePid(); 
     
    587594       
    588595      if(!((counter++)%100))  
    589         MT->makeThread(houseKeeping,0); 
     596        MT->makeThread(houseKeeping,0,"housekeeping"); 
    590597      if(statsWanted) 
    591598        doStats(); 
     
    690697              P.setSocket(*i); 
    691698              P.d_tcp=false; 
    692               MT->makeThread(startDoResolve,(void*)new DNSPacket(P)); 
     699              MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp"); 
    693700            } 
    694701          } 
     
    738745          } 
    739746          else { 
    740             cerr<<"when reading ret="<<ret<<endl; 
     747            //      cerr<<"when reading ret="<<ret<<endl; 
     748            // XXX FIXME I think some stuff needs to happen here - like send an EOF event 
    741749          } 
    742750        } 
     
    763771          } 
    764772          else {  
    765             cerr<<"ret="<<ret<<" when writing"<<endl; 
     773            //      cerr<<"ret="<<ret<<" when writing"<<endl; 
     774            // XXX FIXME I think some stuff needs to happen here - like send an EOF event 
    766775          } 
    767776        } 
     
    829838                else { 
    830839                  ++qcounter; 
    831                   MT->makeThread(startDoResolve,(void*)new DNSPacket(P)); 
     840                  MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp"); 
    832841                } 
    833842              }