Changeset 1635 for trunk/pdns/pdns/inflighter.cc
- Timestamp:
- 06/13/10 22:45:28 (3 years ago)
- Files:
-
- 1 modified
-
trunk/pdns/pdns/inflighter.cc (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/pdns/pdns/inflighter.cc
r1634 r1635 31 31 d_init=true; 32 32 } 33 void run(); 33 34 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception 34 35 35 36 unsigned int d_maxInFlight; … … 78 79 }; 79 80 80 template<typename Container, typename SendReceive> voidInflighter<Container, SendReceive>::run()81 template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run() 81 82 { 82 83 if(!d_init) 83 84 init(); 84 85 85 // cout << "Have "<<d_container.size() << " things to do!"<<endl;86 87 86 for(;;) { 88 87 int burst = 0; 88 89 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth) 89 90 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { 90 91 TTDItem ttdi; … … 93 94 gettimeofday(&ttdi.ttd, 0); 94 95 ttdi.ttd.tv_sec += d_timeoutSeconds; 95 96 if(d_ttdWatch.count(ttdi.id)) { 97 // cerr<<"DUPLICATE INSERT!"<<endl; 98 } 96 99 d_ttdWatch.insert(ttdi); 97 100 … … 100 103 } 101 104 int processed=0; 105 106 107 // if there are queries in flight, handle responses 102 108 if(!d_ttdWatch.empty()) { 103 109 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl; … … 105 111 typename SendReceive::Identifier id; 106 112 113 // get as many answers as available - 'receive' should block for a short while to wait for an answer 107 114 while(d_sr.receive(id, answer)) { 108 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); 109 if(ival != d_ttdWatch.end()) { 115 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for 116 117 if(ival != d_ttdWatch.end()) { // found something! 110 118 ++processed; 111 // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl; 112 d_sr.deliverAnswer(*ival->iter, answer); 119 d_sr.deliverAnswer(*ival->iter, answer); // deliver to sender/receiver 113 120 d_ttdWatch.erase(ival); 114 121 break; // we can send new questions! … … 121 128 122 129 123 if(!processed ) { // no new responses, time for some cleanup130 if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch 124 131 struct timeval now; 125 132 gettimeofday(&now, 0); … … 128 135 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch); 129 136 137 // this provides a list of items sorted by age 130 138 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) { 131 139 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) { … … 136 144 } 137 145 else 138 break; 146 break; // if this one was too new, rest will be too 139 147 } 140 148 } … … 143 151 break; 144 152 } 153 return false; 145 154 } 146 155