root/trunk/pdns/pdns/selectmplexer.cc @ 707

Revision 707, 2.4 KB (checked in by ahu, 7 years ago)

start of mother of all multiplexors :-)

Line 
1#include "mplexer.hh"
2#include "sstuff.hh"
3#include <iostream>
4#include <unistd.h>
5#include "misc.hh"
6#include <boost/lexical_cast.hpp>
7
8using namespace boost;
9
10using namespace std;
11
12void SelectFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, boost::any parameter)
13{
14  Callback cb;
15  cb.d_callback=toDo;
16  cb.d_parameter=parameter;
17  if(cbmap.count(fd))
18    throw FDMultiplexerException("Tried to add fd "+lexical_cast<string>(fd)+ " to multiplexer twice");
19  cbmap[fd]=cb;
20}
21
22void SelectFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
23{
24  if(!cbmap.erase(fd))
25    throw FDMultiplexerException("Tried to remove unlisted fd "+lexical_cast<string>(fd)+ " from multiplexer");
26}
27
28
29int SelectFDMultiplexer::run(struct timeval* now)
30{
31  fd_set readfds, writefds;
32  FD_ZERO(&readfds);
33  FD_ZERO(&writefds);
34 
35  int fdmax=0;
36
37  for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
38    FD_SET(i->first, &readfds);
39    fdmax=max(i->first, fdmax);
40  }
41 
42  struct timeval tv={0,500000};
43  int ret=select(fdmax + 1, &readfds, &writefds, 0, &tv);
44  if(now)
45    gettimeofday(now,0);
46 
47  if(ret < 0 && errno!=EINTR)
48    throw FDMultiplexerException("select returned error: "+stringerror());
49
50  if(ret==0) // nothing
51    return 0;
52
53  d_inrun=true;
54  d_newReadCallbacks=d_readCallbacks;
55  d_newWriteCallbacks=d_writeCallbacks;
56
57  for(callbackmap_t::iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
58    if(FD_ISSET(i->first, &readfds))
59      i->second.d_callback(i->first, i->second.d_parameter);
60  }
61  for(callbackmap_t::iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
62    if(FD_ISSET(i->first, &writefds))
63      i->second.d_callback(i->first, i->second.d_parameter);
64  }
65
66  d_readCallbacks.swap(d_newReadCallbacks);
67  d_writeCallbacks.swap(d_newWriteCallbacks);
68
69  d_inrun=false;
70 
71  return 0;
72}
73
74void acceptData(int fd, boost::any& parameter)
75{
76  cout<<"Have data on fd "<<fd<<endl;
77  Socket* sock=boost::any_cast<Socket*>(parameter);
78  string packet;
79  IPEndpoint rem;
80  sock->recvFrom(packet, rem);
81  cout<<"Received "<<packet.size()<<" bytes!\n";
82}
83
84#if 0
85int main()
86{
87  Socket s(InterNetwork, Datagram);
88 
89  IPEndpoint loc("0.0.0.0", 2000);
90  s.bind(loc);
91
92  SelectFDMultiplexer sfm;
93
94  sfm.addReadFD(s.getHandle(), &acceptData, &s);
95
96  for(int n=0; n < 100 ; ++n) {
97    sfm.run();
98  }
99  sfm.removeReadFD(s.getHandle());
100  sfm.removeReadFD(s.getHandle());
101}
102#endif
103
Note: See TracBrowser for help on using the browser.