root/trunk/pdns/pdns/inflighter.cc @ 1635

Revision 1635, 6.2 KB (checked in by ahu, 3 years ago)

comment the 'inflighter' a bit, plus add very nice statistical output to dnsbulktest
see  http://bert-hubert.blogspot.com/2010/06/better-statistical-regression-tests.html

Line 
1#include <vector>
2#include <deque>
3#include <iostream>
4#include <boost/foreach.hpp>
5#include <boost/multi_index_container.hpp>
6#include <boost/format.hpp>
7#include <sys/time.h>
8#include <time.h>
9#include "iputils.hh"
10#include "statbag.hh"
11#include <sys/socket.h>
12
13using namespace std;
14using namespace boost::multi_index;
15
16struct TimeTag{};
17
18template<typename Container, typename SenderReceiver> class Inflighter
19{
20public:
21  Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
22  {
23    d_burst = 2;
24    d_maxInFlight = 5;
25    d_timeoutSeconds = 3;
26    d_unexpectedResponse = d_timeouts = 0;
27  }
28  void init()
29  {
30    d_iter = d_container.begin();
31    d_init=true;
32  }
33 
34  bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
35 
36  unsigned int d_maxInFlight;
37  unsigned int d_timeoutSeconds;
38  int d_burst;
39 
40  uint64_t getTimeouts()
41  {
42    return d_timeouts;
43  }
44 
45  uint64_t getUnexpecteds()
46  {
47    return d_unexpectedResponse;
48  }
49 
50private:
51  struct TTDItem
52  {
53    typename Container::iterator iter;
54    typename SenderReceiver::Identifier id;
55    struct timeval ttd;
56  };
57
58  typedef multi_index_container<
59    TTDItem,
60    indexed_by<
61      ordered_unique<
62        member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
63      >,
64      ordered_non_unique<
65        tag<TimeTag>,
66        member<TTDItem, struct timeval, &TTDItem::ttd>
67      >
68    >
69  >ttdwatch_t; 
70 
71  Container& d_container;
72  SenderReceiver& d_sr;
73 
74  ttdwatch_t d_ttdWatch;
75  typename Container::iterator d_iter;
76  bool d_init;
77 
78  uint64_t d_unexpectedResponse, d_timeouts;
79};
80
81template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
82{
83  if(!d_init)
84    init();
85   
86  for(;;) {
87    int burst = 0;
88
89    // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
90    while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { 
91      TTDItem ttdi;
92      ttdi.iter = d_iter++;
93      ttdi.id = d_sr.send(*ttdi.iter);
94      gettimeofday(&ttdi.ttd, 0);
95      ttdi.ttd.tv_sec += d_timeoutSeconds;
96      if(d_ttdWatch.count(ttdi.id)) {
97//        cerr<<"DUPLICATE INSERT!"<<endl;
98      }
99      d_ttdWatch.insert(ttdi);
100     
101      if(++burst == d_burst)
102        break;
103    }
104    int processed=0;
105   
106   
107    // if there are queries in flight, handle responses
108    if(!d_ttdWatch.empty()) {
109      // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;           
110      typename SendReceive::Answer answer;
111      typename SendReceive::Identifier id;
112     
113      // get as many answers as available - 'receive' should block for a short while to wait for an answer
114      while(d_sr.receive(id, answer)) {
115        typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
116
117        if(ival != d_ttdWatch.end()) { // found something!
118          ++processed;
119          d_sr.deliverAnswer(*ival->iter, answer);    // deliver to sender/receiver
120          d_ttdWatch.erase(ival);
121          break; // we can send new questions!
122        }
123        else {
124          // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
125          d_unexpectedResponse++;
126        }
127      }
128   
129     
130      if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
131        struct timeval now;
132        gettimeofday(&now, 0);
133       
134        typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
135        waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
136
137        // this provides a list of items sorted by age
138        for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
139          if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
140            d_sr.deliverTimeout(valiter->id);  // so backend can release id
141            waiters_index.erase(valiter++);
142            // cerr<<"Have timeout for id="<< valiter->id <<endl;
143            d_timeouts++;
144          }
145          else 
146            break; // if this one was too new, rest will be too
147        }
148      }
149    }
150    if(d_ttdWatch.empty() && d_iter == d_container.end())
151      break;
152  }
153  return false;
154}
155
156#if 0
157StatBag S;
158
159struct SendReceive
160{
161  typedef int Identifier;
162  typedef int Answer;
163  ComboAddress d_remote;
164  int d_socket;
165  int d_id;
166 
167  SendReceive()
168  {
169    d_id = 0;
170    d_socket = socket(AF_INET, SOCK_DGRAM, 0);
171    int val=1;
172    setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
173   
174    ComboAddress local("0.0.0.0", 1024);
175    bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
176   
177    char buf[512];
178   
179    socklen_t remotelen=sizeof(d_remote);
180    cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
181    int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
182    cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
183    Utility::setNonBlocking(d_socket);
184    connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
185  }
186 
187  ~SendReceive()
188  {
189    ::send(d_socket, "done\r\n", 6, 0);
190  }
191 
192  Identifier send(int& i)
193  {
194    cerr<<"Sending a '"<<i<<"'"<<endl;
195    string msg = (boost::format("%d %d\n") % d_id % i).str();
196    ::send(d_socket, msg.c_str(), msg.length(), 0);
197    return d_id++;
198  }
199 
200  bool receive(Identifier& id, int& i)
201  {
202    if(waitForData(d_socket, 0, 500000) > 0) {
203      char buf[512];
204   
205      int len = recv(d_socket, buf, sizeof(buf), 0);
206      string msg(buf, len);
207      if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
208        throw runtime_error("Invalid input");
209      }
210      return 1;
211    }
212    return 0;
213  }
214 
215  void deliverAnswer(int& i, int j)
216  {
217    cerr<<"We sent "<<i<<", got back: "<<j<<endl;
218  }
219};
220
221
222int main()
223{
224  vector<int> numbers;
225  SendReceive sr;
226  Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
227
228  for(int n=0; n < 100; ++n)
229    numbers.push_back(n*n);
230
231
232  for(;;) {
233    try {
234      inflighter.run();
235      break;
236    }
237    catch(exception& e) {
238      cerr<<"Caught exception: "<<e.what()<<endl;
239    }
240  }
241 
242}
243#endif
Note: See TracBrowser for help on using the browser.