root/trunk/splitpipe/main.cc @ 348

Revision 348, 8.3 KB (checked in by ahu, 8 years ago)

make it work :-)
A lot still needs to be done but it appears to work

Line 
1#include <unistd.h>
2#include <getopt.h>
3#include <fcntl.h>
4#include <sys/select.h>
5#include <time.h>
6#include <sys/time.h>
7#include <sys/wait.h>
8#include <stdint.h>
9#include <signal.h>
10
11#include "ringbuffer.hh"
12
13struct {
14  size_t bufferSize;
15  uint64_t chunkSize;
16  bool verbose;
17  bool debug;
18  vector<string> outputCommand;
19} parameters;
20
21
22uint64_t getSize(const char* desc) 
23{
24  static struct predef {
25    const char* name;
26    uint64_t kb;
27  } predefinedSizes[]= { 
28    {"floppy", 1440000 }, 
29    {"CD", 650000000ULL }, 
30    {"CD-80", 700000000ULL }, 
31    {"CDR-80", 700000000ULL }, 
32    {"DVD", 4700000000ULL }, 
33    {"DVD-5", 4700000000ULL }, 
34    {0, 0} 
35  };
36
37  for(struct predef* p=predefinedSizes; p->name; ++p) {
38    if(!strcasecmp(p->name, desc))
39      return p->kb;
40  }
41  return atoi(desc)*1024;
42}
43
44bool g_havebreak;
45
46void breakHandler(int t)
47{
48  cerr<<"\nsplitpipe: Received interrupt request, terminating output"<<endl;
49  g_havebreak=true;
50}
51
52void unixDie(const string& during)
53{
54  throw runtime_error("during "+string(during)+": "+strerror(errno));
55}
56
57
58void setNonBlocking(int fd)
59{
60  int flags=fcntl(fd,F_GETFL,0);
61  if(flags<0 || fcntl(fd, F_SETFL,flags|O_NONBLOCK) <0)
62    unixDie("Setting filedescriptor to nonblocking failed");
63}
64
65double getTime()
66{
67  struct timeval tv;
68  gettimeofday(&tv, 0);
69  return tv.tv_sec + tv.tv_usec/1000000.0;
70}
71
72
73void usage()
74{
75  cerr<<"splitpipe syntax:\n\n";
76  cerr<<" --buffer-size, -b\tSize of buffer before output, in megabytes"<<endl;
77  cerr<<" --chunk-size, -c\tSize of output chunks, in kilobytes, or use 'DVD', 'CDR' or 'CDR-80'"<<endl;
78  cerr<<" --help, -h\t\tGive this helpful message"<<endl;
79  cerr<<" --verbose, -v\t\tGive verbose output\n\n";
80 
81  exit(1);
82 
83}
84
85void ParseCommandline(int argc, char** argv)
86{
87  int c;
88 
89  while (1) {
90    int option_index = 0;
91    static struct option long_options[] = {
92      {"buffer-size", 1, 0, 'b'},
93      {"chunk-size", 1, 0, 'c'},
94      {"debug", 0, 0, 'd'},
95      {"verbose", 0, 0, 'v'},
96      {"help", 0, 0, 'h'},
97      {0, 0, 0, 0}
98    };
99   
100    c = getopt_long (argc, argv, "b:c:dhv",
101                     long_options, &option_index);
102    if (c == -1)
103      break;
104   
105    switch (c) {
106    case 'b':
107      parameters.bufferSize=1024*atoi(optarg);
108      break;
109    case 'c':
110      parameters.chunkSize=getSize(optarg);
111      break;
112    case 'd':
113      parameters.debug=1;
114      break;
115    case 'h':
116      usage();
117      break;
118    case 'v':
119      parameters.verbose=true;
120      break;
121    }
122  }
123  if (optind < argc) {
124    while (optind < argc) {
125      parameters.outputCommand.push_back(argv[optind++]);
126    }
127  }
128}
129
130pid_t g_pid;
131
132int spawnOutputThread()
133{
134  int d_fds[2];
135  if(pipe(d_fds) < 0)
136    unixDie("unable creating pipe");
137 
138  int pid=fork();
139  if(pid < 0)
140    unixDie("Error during fork");
141
142  if(pid) { // parent
143    g_pid=pid;
144    close(d_fds[0]);
145
146  } else {
147    close(d_fds[1]);
148    dup2(d_fds[0], 0); // connect to stdin
149   
150    char* argvp[parameters.outputCommand.size() + 1];
151    argvp[0]=const_cast<char*>(parameters.outputCommand[0].c_str()); // FOAD 1
152    unsigned int n=0;
153    for(; n < parameters.outputCommand.size() ; ++n)
154      argvp[n]=const_cast<char*>(parameters.outputCommand[n].c_str());
155    argvp[n]=0;
156
157    if(execvp(argvp[0], argvp))
158      unixDie("launch of output script");
159
160    cerr<<"We should Never Ever end up here"<<endl;
161  }
162  return d_fds[1];
163}
164
165
166int main(int argc, char** argv)
167try
168{
169  parameters.debug=parameters.verbose=false;
170  parameters.bufferSize=1000000;
171  parameters.chunkSize=getSize("DVD-5");
172  ParseCommandline(argc, argv);
173
174  signal(SIGPIPE, SIG_IGN);
175  signal(SIGINT, breakHandler);
176  cerr.setf(ios::fixed);
177  cerr.precision(2);
178
179  if(parameters.verbose) {
180    cerr<<"Buffer size: " << parameters.bufferSize/1000000.0 <<" MB\n";
181    cerr<<"Chunk size: " << parameters.chunkSize/1000000.0 <<" MB\n";
182    //    cerr<<"Output command: "<<parameters.outputCommand<<endl;
183  }
184
185  if(parameters.outputCommand.empty()) {
186    cerr<<"No output command specified - unable to write data\n\n";
187    cerr<<"Suggested command for cd: \n";
188    cerr<<"cdrecord dev=/dev/cdrom speed=24 -eject -dummy -tao\n";
189    cerr<<"\nSuggested command for dvd: \n";
190    cerr<<"growisofs -Z/dev/dvd=/dev/stdin -dry-run\n";
191    exit(1);
192  }
193
194  if(!parameters.bufferSize) {
195    cerr<<"Buffer size set to zero, which is unsupported. Try 1000 for 1 megabyte"<<endl;
196    exit(1);
197  }
198
199  if(!parameters.chunkSize) {
200    cerr<<"Chunk size set to zero, which is unsupported. Try --chunk-size DVD for DVD-size chunks"<<endl;
201    exit(1);
202  }
203 
204
205  RingBuffer rb(parameters.bufferSize);
206  setNonBlocking(0);
207  setNonBlocking(1);
208
209  bool inputEof=false, outputEof=false;
210  bool outputOnline=false;
211  int outputfd;
212  uint64_t amountOutput=0;
213
214  char *buffer = new char[parameters.bufferSize];
215
216  if(parameters.verbose) 
217    cerr<<"Prebuffering before starting output script..";
218
219  bool d_firstchunk=true;
220
221  while(1) {
222
223    if(!outputOnline && (inputEof || (1.0 * rb.available() / parameters.bufferSize > 0.5))) {
224      if(d_firstchunk) {
225        cerr<<" done"<<endl;;
226        d_firstchunk=false;
227      }
228      else {
229        cerr<<"\nsplitpipe: reload media, if necessary, and press enter to continue"<<endl;
230        FILE *fp=fopen("/dev/tty", "r");
231        if(!fp) 
232          unixDie("opening of /dev/tty for user input");
233
234        char line[80];
235        fgets(line, sizeof(line) - 1, fp);
236        fclose(fp);
237      }
238
239      cerr<<"splitpipe: bringing output script online - buffer " << 100.0*rb.available() / parameters.bufferSize <<"% full"<<endl;
240      outputfd=spawnOutputThread();
241      outputOnline=true;
242      amountOutput=0;
243    }
244
245    fd_set inputs;
246    fd_set outputs;
247
248    FD_ZERO(&inputs);
249    FD_ZERO(&outputs);
250
251    if(!inputEof && rb.room())
252      FD_SET(0, &inputs);
253
254    if(rb.available() && outputOnline)
255      FD_SET(outputfd, &outputs);
256
257    int ret=select( outputOnline ? (outputfd + 1) : 1, 
258                    &inputs, &outputs, 0, 0);
259    if(ret < 0)
260      unixDie("select returned an error");
261
262    if(!ret)  // odd
263      continue;
264   
265
266    size_t bytesRead=0;
267    if(!inputEof && FD_ISSET(0, &inputs)) {
268      ret=read(0, buffer, min((size_t)1000000, rb.room()));
269      if(ret < 0)
270        unixDie("Reading from standard input");
271      if(!ret) {
272        if(parameters.debug) 
273          cerr<<"EOF on input, room in buffer: "<<rb.room()<<endl;
274        inputEof=true;
275      } else {
276        if(parameters.debug)
277          cerr<<"Read "<<ret<<" bytes, and stored them"<<endl;
278        rb.store(buffer, ret);
279        bytesRead=ret;
280      }
281    }
282
283    if(!outputEof && outputOnline && rb.available() &&  FD_ISSET(outputfd, &outputs)) {
284      size_t totalBytes=0;
285      do {
286        const char *rbuffer;
287        size_t len;
288        rb.get(&rbuffer, &len);
289
290        len=min((uint64_t)len, parameters.chunkSize - amountOutput);
291       
292        if(!len) {
293          cerr<<"\nsplitpipe: output a full chunk, closing pipe, waiting for output command to exit..\n";
294          close(outputfd);
295          int status;
296          if(waitpid(g_pid, &status, 0) < 0)
297            unixDie("wait on child process");
298
299          if(WIFEXITED(status))
300            cerr<<"\nsplitpipe: output command exited with status "<<WEXITSTATUS(status)<<endl;
301          else {
302            cerr<<"\nsplitpipe: output command exited abnormally";
303            if(WIFSIGNALED(status))
304              cerr<<", by signal "<<WTERMSIG(status);
305            cerr<<endl;
306          }
307         
308          outputOnline=false;
309          outputfd=-1;
310          amountOutput=0;
311          break; 
312        }
313
314        ret=write(outputfd, rbuffer, len);
315        if(ret < 0) {
316          if(errno==EAGAIN)
317            break;
318          else
319            unixDie("Writing to standard output");
320        }
321
322        if(!ret) {
323          if(parameters.debug) 
324            cerr<<"Had EOF on *OUTPUT*"<<endl;
325          outputEof=true;
326        }
327        if(parameters.debug) 
328          cerr<<"Wrote out "<<ret<<" out of "<<len<<" bytes"<<endl;
329        rb.advance(ret);
330
331        amountOutput+=ret;
332
333        totalBytes+=ret;
334
335        if(parameters.debug) 
336          cerr<<"There are now "<<rb.available()<<" bytes left in the rb"<<endl;
337        //      else
338        //        cerr<<getTime()-startTime<<"\t"<<rb.available()<<"\n";
339      } while(totalBytes < bytesRead && rb.available());
340    }
341
342    if(inputEof && !rb.available())
343      break;
344  }
345
346  if(outputOnline) {
347    cerr<<"\nsplitpipe: done with input, waiting for output script to exit..\n";
348    close(outputfd);
349    int status;
350    waitpid(g_pid, &status, 0);
351    if(WIFEXITED(status))
352      cerr<<"\nsplitpipe: output command exited with status "<<WEXITSTATUS(status)<<endl;
353    else {
354      cerr<<"\nsplitpipe: output command exited abnormally";
355      if(WIFSIGNALED(status))
356        cerr<<", by signal "<<WTERMSIG(status)<<endl;
357    }
358  }
359
360}
361catch(exception &e)
362{
363  cerr<<"Fatal: "<<e.what()<<endl;
364}
Note: See TracBrowser for help on using the browser.