Changeset 442
- Timestamp:
- 07/14/05 19:40:18 (8 years ago)
- Location:
- trunk/pdns/pdns
- Files:
-
- 4 modified
-
dnsreplay.cc (modified) (1 diff)
-
mtasker.cc (modified) (8 diffs)
-
mtasker.hh (modified) (2 diffs)
-
pdns_recursor.cc (modified) (13 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/pdns/pdns/dnsreplay.cc
r441 r442 303 303 304 304 nanosleep(&tosleep, 0); 305 lastsent=pr.d_pheader.ts; 305 306 } 306 lastsent=pr.d_pheader.ts; 307 307 308 // cout<<"sending!"<<endl; 308 309 s_socket->sendTo(string(pr.d_payload, pr.d_payload + pr.d_len), remote); -
trunk/pdns/pdns/mtasker.cc
r234 r442 167 167 w.tid=d_tid; 168 168 169 if(d_waiters.count(key)) { // there was already an exact same waiter 170 return -1; 171 } 169 172 d_waiters[key]=w; 170 173 171 swapcontext(d_waiters[key].context,&d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first 174 if(swapcontext(d_waiters[key].context,&d_kernel)) { // 'A' will return here when 'key' has arrived, hands over control to kernel first 175 perror("swapcontext"); 176 exit(EXIT_FAILURE); // no way we can deal with this 177 } 172 178 if(val && d_waitstatus==Answer) 173 179 *val=d_waitval; … … 182 188 { 183 189 d_runQueue.push(d_tid); 184 swapcontext(d_threads[d_tid],&d_kernel); // give control to the kernel 190 if(swapcontext(d_threads[d_tid].first ,&d_kernel) < 0) { // give control to the kernel 191 perror("swapcontext in yield"); 192 exit(EXIT_FAILURE); 193 } 185 194 } 186 195 … … 194 203 { 195 204 if(!d_waiters.count(key)) { 205 // cout<<"Event sent nobody was waiting for!"<<endl; 196 206 return 0; 197 207 } … … 205 215 206 216 d_waiters.erase(key); // removes the waitpoint 207 swapcontext(&d_kernel,userspace); // swaps back to the above point 'A' 208 217 if(swapcontext(&d_kernel,userspace)) { // swaps back to the above point 'A' 218 perror("swapcontext in sendEvent"); 219 exit(EXIT_FAILURE); 220 } 209 221 delete userspace; 210 222 return 1; … … 216 228 \param val A void pointer that can be used to pass data to the thread 217 229 */ 218 template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val )230 template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val, const string& name) 219 231 { 220 232 ucontext_t *uc=new ucontext_t; … … 230 242 makecontext (uc, (void (*)(void))threadWrapper, 4, this, start, d_maxtid, val); 231 243 #endif 232 d_threads[d_maxtid]= uc;244 d_threads[d_maxtid]=make_pair(uc, name); 233 245 d_runQueue.push(d_maxtid++); // will run at next schedule invocation 234 246 } … … 250 262 if(!d_runQueue.empty()) { 251 263 d_tid=d_runQueue.front(); 252 swapcontext(&d_kernel, d_threads[d_tid]); 264 if(swapcontext(&d_kernel, d_threads[d_tid].first)) { 265 perror("swapcontext in schedule"); 266 exit(EXIT_FAILURE); 267 } 268 253 269 d_runQueue.pop(); 254 270 return true; 255 271 } 256 272 if(!d_zombiesQueue.empty()) { 257 delete[] (char *)d_threads[d_zombiesQueue.front()] ->uc_stack.ss_sp;258 delete d_threads[d_zombiesQueue.front()] ;273 delete[] (char *)d_threads[d_zombiesQueue.front()].first->uc_stack.ss_sp; 274 delete d_threads[d_zombiesQueue.front()].first; 259 275 d_threads.erase(d_zombiesQueue.front()); 260 276 d_zombiesQueue.pop(); … … 266 282 if(i->second.ttd && i->second.ttd<now) { 267 283 d_waitstatus=TimeOut; 268 swapcontext(&d_kernel,i->second.context); // swaps back to the above point 'A' 284 if(swapcontext(&d_kernel,i->second.context)) { // swaps back to the above point 'A' 285 perror("swapcontext in schedule2"); 286 exit(EXIT_FAILURE); 287 } 269 288 delete i->second.context; 270 289 d_waiters.erase(i->first); // removes the waitpoint -
trunk/pdns/pdns/mtasker.hh
r234 r442 53 53 typedef std::map<EventKey,Waiter> waiters_t; 54 54 waiters_t d_waiters; 55 std::map<int,ucontext_t*> d_threads; 55 typedef std::map<int,pair<ucontext_t*,string> > mthreads_t; 56 mthreads_t d_threads; 56 57 int d_tid; 57 58 int d_maxtid; … … 77 78 int sendEvent(const EventKey& key, const EventVal* val=0); 78 79 void getEvents(std::vector<EventKey>& events); 79 void makeThread(tfunc_t *start, void* val );80 void makeThread(tfunc_t *start, void* val, const string& name=""); 80 81 bool schedule(); 81 82 bool noProcesses(); 82 83 unsigned int numProcesses(); 83 84 int getTid(); 85 void setTitle(const string& name) 86 { 87 d_threads[d_tid].second=name; 88 } 89 84 90 private: 85 91 static void threadWrapper(MTasker *self, tfunc_t *tf, int tid, void* val); -
trunk/pdns/pdns/pdns_recursor.cc
r440 r442 123 123 d_tcpclientwritesocks[sock->getHandle()]=pident; 124 124 125 if(!MT->waitEvent(pident,&packet,1)) { // timeout 125 int ret=MT->waitEvent(pident,&packet,1); 126 if(!ret || ret==-1) { // timeout 126 127 d_tcpclientwritesocks.erase(sock->getHandle()); 127 return 0; 128 } 129 // cerr<<"asendtcp happy"<<endl; 130 return 1; 131 } 132 128 } 129 return ret; 130 } 131 132 // -1 is error, 0 is timeout, 1 is success 133 133 int arecvtcp(string& data, int len, Socket* sock) 134 134 { … … 142 142 d_tcpclientreadsocks[sock->getHandle()]=pident; 143 143 144 if(!MT->waitEvent(pident,&data,1)) { // timeout 144 int ret=MT->waitEvent(pident,&data,1); 145 if(!ret || ret==-1) { // timeout 145 146 d_tcpclientreadsocks.erase(sock->getHandle()); 146 return 0; 147 } 148 149 // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl; 150 return 1; 147 } 148 return ret; 151 149 } 152 150 153 151 154 152 /* these two functions are used by LWRes */ 153 // -1 is error, > 1 is success 155 154 int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 156 155 { … … 158 157 } 159 158 159 // -1 is error, 0 is timeout, 1 is success 160 160 int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id) 161 161 { … … 165 165 166 166 string packet; 167 if(!MT->waitEvent(pident,&packet,1)) { // timeout 168 return 0; 169 } 170 171 *d_len=packet.size(); 172 memcpy(data,packet.c_str(),min(len,*d_len)); 173 174 return 1; 167 int ret=MT->waitEvent(pident,&packet,1); 168 if(ret > 0) { 169 *d_len=packet.size(); 170 memcpy(data,packet.c_str(),min(len,*d_len)); 171 } 172 173 return ret; 175 174 } 176 175 … … 221 220 222 221 delete (DNSPacket *)p; 223 222 224 223 vector<DNSResourceRecord>ret; 225 224 DNSPacket *R=P.replyPacket(); 226 225 R->setA(false); 227 226 R->setRA(true); 228 227 // MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName()); 229 228 SyncRes sr; 230 229 if(!quiet) … … 274 273 } 275 274 275 // MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName()); 276 276 if(!quiet) { 277 277 L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName(); … … 419 419 statsWanted=true; 420 420 } 421 422 void usr2Handler(int) 423 { 424 SyncRes::setLog(true); 425 } 426 421 427 422 428 … … 558 564 } 559 565 signal(SIGUSR1,usr1Handler); 566 signal(SIGUSR2,usr2Handler); 560 567 561 568 writePid(); … … 587 594 588 595 if(!((counter++)%100)) 589 MT->makeThread(houseKeeping,0 );596 MT->makeThread(houseKeeping,0,"housekeeping"); 590 597 if(statsWanted) 591 598 doStats(); … … 690 697 P.setSocket(*i); 691 698 P.d_tcp=false; 692 MT->makeThread(startDoResolve,(void*)new DNSPacket(P) );699 MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp"); 693 700 } 694 701 } … … 738 745 } 739 746 else { 740 cerr<<"when reading ret="<<ret<<endl; 747 // cerr<<"when reading ret="<<ret<<endl; 748 // XXX FIXME I think some stuff needs to happen here - like send an EOF event 741 749 } 742 750 } … … 763 771 } 764 772 else { 765 cerr<<"ret="<<ret<<" when writing"<<endl; 773 // cerr<<"ret="<<ret<<" when writing"<<endl; 774 // XXX FIXME I think some stuff needs to happen here - like send an EOF event 766 775 } 767 776 } … … 829 838 else { 830 839 ++qcounter; 831 MT->makeThread(startDoResolve,(void*)new DNSPacket(P) );840 MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp"); 832 841 } 833 842 }