root/trunk/pdns/pdns/inflighter.cc @ 1448

Revision 1448, 5.2 KB (checked in by ahu, 3 years ago)

generic templatized 'do x things at a time' class

Line 
1#include <vector>
2#include <deque>
3#include <iostream>
4#include <boost/foreach.hpp>
5#include <boost/multi_index_container.hpp>
6#include <boost/format.hpp>
7#include <sys/time.h>
8#include <time.h>
9#include "iputils.hh"
10#include "statbag.hh"
11#include <sys/socket.h>
12
13using namespace std;
14using namespace boost::multi_index;
15
16struct TimeTag{};
17
18template<typename Container, typename SenderReceiver> class Inflighter
19{
20public:
21  Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
22  {
23    d_burst = 2;
24    d_maxInFlight = 5;
25    d_timeoutSeconds = 3;
26    d_unexpectedResponse = d_timeouts = 0;
27  }
28  void init()
29  {
30    d_iter = d_container.begin();
31    d_init=true;
32  }
33  void run();
34 
35  unsigned int d_maxInFlight;
36  unsigned int d_timeoutSeconds;
37  int d_burst;
38 
39private:
40  struct TTDItem
41  {
42    typename Container::iterator iter;
43    typename SenderReceiver::Identifier id;
44    struct timeval ttd;
45  };
46
47  typedef multi_index_container<
48    TTDItem,
49    indexed_by<
50      ordered_unique<
51        member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
52      >,
53      ordered_non_unique<
54        tag<TimeTag>,
55        member<TTDItem, struct timeval, &TTDItem::ttd>
56      >
57    >
58  >ttdwatch_t; 
59 
60  Container& d_container;
61  SenderReceiver& d_sr;
62 
63 
64 
65  ttdwatch_t d_ttdWatch;
66  typename Container::iterator d_iter;
67  bool d_init;
68 
69  uint64_t d_unexpectedResponse, d_timeouts;
70};
71
72template<typename Container, typename SendReceive> void Inflighter<Container, SendReceive>::run()
73{
74  if(!d_init)
75    init();
76   
77  // cout << "Have "<<d_container.size() << " things to do!"<<endl;
78 
79  for(;;) {
80    int burst = 0;
81    while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { 
82      TTDItem ttdi;
83      ttdi.iter = ++d_iter;
84      ttdi.id = d_sr.send(*ttdi.iter);
85      gettimeofday(&ttdi.ttd, 0);
86      ttdi.ttd.tv_sec += d_timeoutSeconds;
87     
88      d_ttdWatch.insert(ttdi);
89     
90      if(++burst == d_burst)
91        break;
92    }
93    int processed=0;
94    if(!d_ttdWatch.empty()) {
95      // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;           
96      typename SendReceive::Answer answer;
97      typename SendReceive::Identifier id;
98     
99      while(d_sr.receive(id, answer)) {
100        typename ttdwatch_t::iterator ival = d_ttdWatch.find(id);
101        if(ival != d_ttdWatch.end()) {
102          ++processed;
103          // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl;
104          d_sr.deliverAnswer(*ival->iter, answer);
105          d_ttdWatch.erase(ival);
106          break; // we can send new questions!
107        }
108        else {
109         // cerr<<"UNEXPECTED ANSWER!"<<endl;
110          d_unexpectedResponse++;
111        }
112      }
113   
114     
115      if(!processed) { // no new responses, time for some cleanup
116        struct timeval now;
117        gettimeofday(&now, 0);
118       
119        typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
120        waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
121
122        for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
123          if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
124            waiters_index.erase(valiter++);
125           // cerr<<"Have timeout"<<endl;
126          }
127          else 
128            break;
129        }
130      }
131    }
132    if(d_ttdWatch.empty() && d_iter == d_container.end())
133      break;
134  }
135}
136
137#if 0
138StatBag S;
139
140struct SendReceive
141{
142  typedef int Identifier;
143  typedef int Answer;
144  ComboAddress d_remote;
145  int d_socket;
146  int d_id;
147 
148  SendReceive()
149  {
150    d_id = 0;
151    d_socket = socket(AF_INET, SOCK_DGRAM, 0);
152    int val=1;
153    setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
154   
155    ComboAddress local("0.0.0.0", 1024);
156    bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
157   
158    char buf[512];
159   
160    socklen_t remotelen=sizeof(d_remote);
161    cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
162    int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
163    cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
164    Utility::setNonBlocking(d_socket);
165    connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
166  }
167 
168  ~SendReceive()
169  {
170    ::send(d_socket, "done\r\n", 6, 0);
171  }
172 
173  Identifier send(int& i)
174  {
175    cerr<<"Sending a '"<<i<<"'"<<endl;
176    string msg = (boost::format("%d %d\n") % d_id % i).str();
177    ::send(d_socket, msg.c_str(), msg.length(), 0);
178    return d_id++;
179  }
180 
181  bool receive(Identifier& id, int& i)
182  {
183    if(waitForData(d_socket, 0, 500000) > 0) {
184      char buf[512];
185   
186      int len = recv(d_socket, buf, sizeof(buf), 0);
187      string msg(buf, len);
188      if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
189        throw runtime_error("Invalid input");
190      }
191      return 1;
192    }
193    return 0;
194  }
195 
196  void deliverAnswer(int& i, int j)
197  {
198    cerr<<"We sent "<<i<<", got back: "<<j<<endl;
199  }
200};
201
202
203int main()
204{
205  vector<int> numbers;
206  SendReceive sr;
207  Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
208
209  for(int n=0; n < 100; ++n)
210    numbers.push_back(n*n);
211
212
213  for(;;) {
214    try {
215      inflighter.run();
216      break;
217    }
218    catch(exception& e) {
219      cerr<<"Caught exception: "<<e.what()<<endl;
220    }
221  }
222 
223}
224#endif
Note: See TracBrowser for help on using the browser.