root/trunk/splitpipe/main.cc @ 346

Revision 346, 6.7 KB (checked in by ahu, 8 years ago)

improve output
fix termination of last output script

Line 
1#include <unistd.h>
2#include <getopt.h>
3#include <fcntl.h>
4#include <sys/select.h>
5#include <time.h>
6#include <sys/time.h>
7#include <sys/wait.h>
8#include <stdint.h>
9
10#include "ringbuffer.hh"
11
12struct {
13  size_t bufferSize;
14  uint64_t chunkSize;
15  bool verbose;
16  bool debug;
17  vector<string> outputCommand;
18} parameters;
19
20
21uint64_t getSize(const char* desc) 
22{
23  static struct predef {
24    const char* name;
25    uint64_t kb;
26  } predefinedSizes[]= { 
27    {"floppy", 1440000 }, 
28    {"DVD", 4700000000ULL }, 
29    {"DVD-5", 4700000000ULL }, 
30    {0, 0} 
31  };
32
33  for(struct predef* p=predefinedSizes; p->name; ++p) {
34    if(!strcasecmp(p->name, desc))
35      return p->kb;
36  }
37  return atoi(desc)*1024;
38}
39
40void unixDie(const string& during)
41{
42  throw runtime_error("during "+string(during)+": "+strerror(errno));
43}
44
45
46void setNonBlocking(int fd)
47{
48  int flags=fcntl(fd,F_GETFL,0);
49  if(flags<0 || fcntl(fd, F_SETFL,flags|O_NONBLOCK) <0)
50    unixDie("Setting filedescriptor to nonblocking failed");
51}
52
53double getTime()
54{
55  struct timeval tv;
56  gettimeofday(&tv, 0);
57  return tv.tv_sec + tv.tv_usec/1000000.0;
58}
59
60
61void usage()
62{
63  cerr<<"splitpipe syntax:\n\n";
64  cerr<<" --buffer-size, -b\tSize of buffer before output, in megabytes"<<endl;
65  cerr<<" --chunk-size, -c\tSize of output chunks, in kilobytes, or use 'DVD', 'CDR' or 'CDR80'"<<endl;
66  cerr<<" --help, -h\t\tGive this helpful message"<<endl;
67  cerr<<" --verbose, -v\t\tGive verbose output\n\n";
68 
69  exit(1);
70 
71}
72
73void ParseCommandline(int argc, char** argv)
74{
75  int c;
76 
77  while (1) {
78    int option_index = 0;
79    static struct option long_options[] = {
80      {"buffer-size", 1, 0, 'b'},
81      {"chunk-size", 1, 0, 'c'},
82      {"debug", 0, 0, 'd'},
83      {"verbose", 0, 0, 'v'},
84      {"help", 0, 0, 'h'},
85      {0, 0, 0, 0}
86    };
87   
88    c = getopt_long (argc, argv, "b:c:dhv",
89                     long_options, &option_index);
90    if (c == -1)
91      break;
92   
93    switch (c) {
94    case 'b':
95      parameters.bufferSize=1024*atoi(optarg);
96      break;
97    case 'c':
98      parameters.chunkSize=getSize(optarg);
99      break;
100    case 'd':
101      parameters.debug=1;
102      break;
103    case 'h':
104      usage();
105      break;
106    case 'v':
107      parameters.verbose=true;
108      break;
109    }
110  }
111  if (optind < argc) {
112    while (optind < argc) {
113      parameters.outputCommand.push_back(argv[optind++]);
114    }
115  }
116}
117
118pid_t g_pid;
119
120int spawnOutputThread()
121{
122  int d_fds[2];
123  if(pipe(d_fds) < 0)
124    unixDie("unable creating pipe");
125 
126  int pid=fork();
127  if(pid < 0)
128    unixDie("Error during fork");
129
130  if(pid) { // parent
131    g_pid=pid;
132    close(d_fds[0]);
133
134  } else {
135    close(d_fds[1]);
136    dup2(d_fds[0], 0); // connect to stdin
137   
138    char* argvp[parameters.outputCommand.size() + 1];
139    argvp[0]=const_cast<char*>(parameters.outputCommand[0].c_str()); // FOAD 1
140    unsigned int n=0;
141    for(; n < parameters.outputCommand.size() ; ++n)
142      argvp[n]=const_cast<char*>(parameters.outputCommand[n].c_str());
143    argvp[n]=0;
144
145    if(execvp(argvp[0], argvp))
146      unixDie("launch of output script");
147
148    cerr<<"We should Never Ever end up here"<<endl;
149  }
150  return d_fds[1];
151}
152
153
154int main(int argc, char** argv)
155try
156{
157  parameters.debug=parameters.verbose=false;
158  parameters.bufferSize=1000000;
159  parameters.chunkSize=getSize("DVD-5");
160  ParseCommandline(argc, argv);
161
162  cerr.setf(ios::fixed);
163  cerr.precision(2);
164
165  if(parameters.verbose) {
166    cerr<<"Buffer size: " << parameters.bufferSize/1000000.0 <<" MB\n";
167    cerr<<"Chunk size: " << parameters.chunkSize/1000000.0 <<" MB\n";
168    //    cerr<<"Output command: "<<parameters.outputCommand<<endl;
169  }
170
171  if(parameters.outputCommand.empty()) {
172    cerr<<"No output command specified - unable to write data"<<endl;
173    exit(1);
174  }
175
176  if(!parameters.bufferSize) {
177    cerr<<"Buffer size set to zero, which is unsupported. Try 1000 for 1 megabyte"<<endl;
178    exit(1);
179  }
180
181  if(!parameters.chunkSize) {
182    cerr<<"Chunk size set to zero, which is unsupported. Try --chunk-size DVD for DVD-size chunks"<<endl;
183    exit(1);
184  }
185 
186
187  RingBuffer rb(parameters.bufferSize);
188  setNonBlocking(0);
189  setNonBlocking(1);
190
191  bool inputEof=false, outputEof=false;
192  bool outputOnline=false;
193  int outputfd;
194  uint64_t amountOutput=0;
195
196  char *buffer = new char[parameters.bufferSize];
197  while(1) {
198
199    if(!outputOnline && (inputEof || (1.0 * rb.available() / parameters.bufferSize > 0.5))) {
200      cerr<<"bringing output script online - buffer " << 100.0*rb.available() / parameters.bufferSize <<"% full"<<endl;
201      outputfd=spawnOutputThread();
202      outputOnline=true;
203      amountOutput=0;
204    }
205
206    fd_set inputs;
207    fd_set outputs;
208
209    FD_ZERO(&inputs);
210    FD_ZERO(&outputs);
211
212    if(!inputEof && rb.room())
213      FD_SET(0, &inputs);
214
215    if(rb.available() && outputOnline)
216      FD_SET(outputfd, &outputs);
217
218    int ret=select( outputOnline ? (outputfd + 1) : 1, 
219                    &inputs, &outputs, 0, 0);
220    if(ret < 0)
221      unixDie("select returned an error");
222
223    if(!ret)  // odd
224      continue;
225   
226
227    size_t bytesRead=0;
228    if(!inputEof && FD_ISSET(0, &inputs)) {
229      ret=read(0, buffer, min((size_t)1000000, rb.room()));
230      if(ret < 0)
231        unixDie("Reading from standard input");
232      if(!ret) {
233        if(parameters.debug) 
234          cerr<<"EOF on input, room in buffer: "<<rb.room()<<endl;
235        inputEof=true;
236      } else {
237        if(parameters.debug)
238          cerr<<"Read "<<ret<<" bytes, and stored them"<<endl;
239        rb.store(buffer, ret);
240        bytesRead=ret;
241      }
242    }
243
244    if(!outputEof && outputOnline && rb.available() &&  FD_ISSET(outputfd, &outputs)) {
245      size_t totalBytes=0;
246      do {
247        const char *rbuffer;
248        size_t len;
249        rb.get(&rbuffer, &len);
250
251        len=min((uint64_t)len, parameters.chunkSize - amountOutput);
252       
253        if(!len) {
254          cerr<<"Output a full chunk, closing pipe, waiting for output command to exit..";
255          close(outputfd);
256          int status;
257          waitpid(g_pid, &status, 0);
258          cerr<<" done!"<<endl;
259          outputOnline=false;
260          outputfd=-1;
261          amountOutput=0;
262          break; 
263        }
264
265        ret=write(outputfd, rbuffer, len);
266        if(ret < 0) {
267          if(errno==EAGAIN)
268            break;
269          else
270            unixDie("Writing to standard output");
271        }
272
273        if(!ret) {
274          if(parameters.debug) 
275            cerr<<"Had EOF on *OUTPUT*"<<endl;
276          outputEof=true;
277        }
278        if(parameters.debug) 
279          cerr<<"Wrote out "<<ret<<" out of "<<len<<" bytes"<<endl;
280        rb.advance(ret);
281
282        amountOutput+=ret;
283
284        totalBytes+=ret;
285
286        if(parameters.debug) 
287          cerr<<"There are now "<<rb.available()<<" bytes left in the rb"<<endl;
288        //      else
289        //        cerr<<getTime()-startTime<<"\t"<<rb.available()<<"\n";
290      }while(totalBytes < bytesRead && rb.available());
291    }
292
293    if(inputEof && !rb.available())
294      break;
295  }
296
297  if(outputOnline) {
298    cerr<<"Done with input, waiting for output script to exit..";
299    close(outputfd);
300    int status;
301    waitpid(g_pid, &status, 0);
302    cerr<<" done!"<<endl;
303  }
304
305}
306catch(exception &e)
307{
308  cerr<<"Fatal: "<<e.what()<<endl;
309}
Note: See TracBrowser for help on using the browser.