root/trunk/pdns/pdns/inflighter.cc @ 1634

Revision 1634, 5.6 KB (checked in by ahu, 3 years ago)

add timeout reporting to inflighter

Line 
1#include <vector>
2#include <deque>
3#include <iostream>
4#include <boost/foreach.hpp>
5#include <boost/multi_index_container.hpp>
6#include <boost/format.hpp>
7#include <sys/time.h>
8#include <time.h>
9#include "iputils.hh"
10#include "statbag.hh"
11#include <sys/socket.h>
12
13using namespace std;
14using namespace boost::multi_index;
15
16struct TimeTag{};
17
18template<typename Container, typename SenderReceiver> class Inflighter
19{
20public:
21  Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
22  {
23    d_burst = 2;
24    d_maxInFlight = 5;
25    d_timeoutSeconds = 3;
26    d_unexpectedResponse = d_timeouts = 0;
27  }
28  void init()
29  {
30    d_iter = d_container.begin();
31    d_init=true;
32  }
33  void run();
34 
35  unsigned int d_maxInFlight;
36  unsigned int d_timeoutSeconds;
37  int d_burst;
38 
39  uint64_t getTimeouts()
40  {
41    return d_timeouts;
42  }
43 
44  uint64_t getUnexpecteds()
45  {
46    return d_unexpectedResponse;
47  }
48 
49private:
50  struct TTDItem
51  {
52    typename Container::iterator iter;
53    typename SenderReceiver::Identifier id;
54    struct timeval ttd;
55  };
56
57  typedef multi_index_container<
58    TTDItem,
59    indexed_by<
60      ordered_unique<
61        member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
62      >,
63      ordered_non_unique<
64        tag<TimeTag>,
65        member<TTDItem, struct timeval, &TTDItem::ttd>
66      >
67    >
68  >ttdwatch_t; 
69 
70  Container& d_container;
71  SenderReceiver& d_sr;
72 
73  ttdwatch_t d_ttdWatch;
74  typename Container::iterator d_iter;
75  bool d_init;
76 
77  uint64_t d_unexpectedResponse, d_timeouts;
78};
79
80template<typename Container, typename SendReceive> void Inflighter<Container, SendReceive>::run()
81{
82  if(!d_init)
83    init();
84   
85  // cout << "Have "<<d_container.size() << " things to do!"<<endl;
86 
87  for(;;) {
88    int burst = 0;
89    while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { 
90      TTDItem ttdi;
91      ttdi.iter = d_iter++;
92      ttdi.id = d_sr.send(*ttdi.iter);
93      gettimeofday(&ttdi.ttd, 0);
94      ttdi.ttd.tv_sec += d_timeoutSeconds;
95     
96      d_ttdWatch.insert(ttdi);
97     
98      if(++burst == d_burst)
99        break;
100    }
101    int processed=0;
102    if(!d_ttdWatch.empty()) {
103      // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;           
104      typename SendReceive::Answer answer;
105      typename SendReceive::Identifier id;
106     
107      while(d_sr.receive(id, answer)) {
108        typename ttdwatch_t::iterator ival = d_ttdWatch.find(id);
109        if(ival != d_ttdWatch.end()) {
110          ++processed;
111          // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl;
112          d_sr.deliverAnswer(*ival->iter, answer);
113          d_ttdWatch.erase(ival);
114          break; // we can send new questions!
115        }
116        else {
117          // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
118          d_unexpectedResponse++;
119        }
120      }
121   
122     
123      if(!processed) { // no new responses, time for some cleanup
124        struct timeval now;
125        gettimeofday(&now, 0);
126       
127        typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
128        waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
129
130        for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
131          if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
132            d_sr.deliverTimeout(valiter->id);  // so backend can release id
133            waiters_index.erase(valiter++);
134            // cerr<<"Have timeout for id="<< valiter->id <<endl;
135            d_timeouts++;
136          }
137          else 
138            break;
139        }
140      }
141    }
142    if(d_ttdWatch.empty() && d_iter == d_container.end())
143      break;
144  }
145}
146
147#if 0
148StatBag S;
149
150struct SendReceive
151{
152  typedef int Identifier;
153  typedef int Answer;
154  ComboAddress d_remote;
155  int d_socket;
156  int d_id;
157 
158  SendReceive()
159  {
160    d_id = 0;
161    d_socket = socket(AF_INET, SOCK_DGRAM, 0);
162    int val=1;
163    setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
164   
165    ComboAddress local("0.0.0.0", 1024);
166    bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
167   
168    char buf[512];
169   
170    socklen_t remotelen=sizeof(d_remote);
171    cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
172    int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
173    cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
174    Utility::setNonBlocking(d_socket);
175    connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
176  }
177 
178  ~SendReceive()
179  {
180    ::send(d_socket, "done\r\n", 6, 0);
181  }
182 
183  Identifier send(int& i)
184  {
185    cerr<<"Sending a '"<<i<<"'"<<endl;
186    string msg = (boost::format("%d %d\n") % d_id % i).str();
187    ::send(d_socket, msg.c_str(), msg.length(), 0);
188    return d_id++;
189  }
190 
191  bool receive(Identifier& id, int& i)
192  {
193    if(waitForData(d_socket, 0, 500000) > 0) {
194      char buf[512];
195   
196      int len = recv(d_socket, buf, sizeof(buf), 0);
197      string msg(buf, len);
198      if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
199        throw runtime_error("Invalid input");
200      }
201      return 1;
202    }
203    return 0;
204  }
205 
206  void deliverAnswer(int& i, int j)
207  {
208    cerr<<"We sent "<<i<<", got back: "<<j<<endl;
209  }
210};
211
212
213int main()
214{
215  vector<int> numbers;
216  SendReceive sr;
217  Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
218
219  for(int n=0; n < 100; ++n)
220    numbers.push_back(n*n);
221
222
223  for(;;) {
224    try {
225      inflighter.run();
226      break;
227    }
228    catch(exception& e) {
229      cerr<<"Caught exception: "<<e.what()<<endl;
230    }
231  }
232 
233}
234#endif
Note: See TracBrowser for help on using the browser.