Changeset 356

Show
Ignore:
Timestamp:
04/16/05 18:54:21 (8 years ago)
Author:
ahu
Message:

add joinpipe to Makefile
implement stretches in splitpipe

Location:
trunk/splitpipe
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • trunk/splitpipe/Makefile

    r353 r356  
    33OUTPUTDIR=splitpipe-$(VERSION) 
    44 
    5 all: splitpipe 
     5all: splitpipe joinpipe 
    66 
    77clean: 
    88        rm -rf *~ *.o splitpipe  
    99 
    10 splitpipe_OBJECTS=splitpipe.o 
     10common_OBJECTS=misc.o 
     11splitpipe_OBJECTS=splitpipe.o $(common_OBJECTS) 
     12joinpipe_OBJECTS=joinpipe.o $(common_OBJECTS) 
    1113 
    1214splitpipe: $(splitpipe_OBJECTS) 
    1315        g++ $(splitpipe_OBJECTS) -o $@ 
     16 
     17joinpipe: $(joinpipe_OBJECTS) 
     18        g++ $(joinpipe_OBJECTS) -o $@ 
     19 
    1420 
    1521check:  
     
    2329        tar cvzf $(OUTPUTDIR).tar.gz $(OUTPUTDIR)/ 
    2430        rm -rf $(OUTPUTDIR) 
    25          
    26          
  • trunk/splitpipe/splitpipe.cc

    r354 r356  
    2121#include <fcntl.h> 
    2222#include <sys/select.h> 
     23#include <netinet/in.h> 
    2324#include <time.h> 
    2425#include <sys/time.h> 
     
    2627#include <stdint.h> 
    2728#include <signal.h> 
    28  
     29#include "misc.hh" 
    2930#include "ringbuffer.hh" 
    3031 
     
    5354uint64_t getSize(const char* desc)  
    5455{ 
    55  
    5656  for(struct predef* p=predefinedSizes; p->name; ++p) { 
    5757    if(!strcasecmp(p->name, desc)) 
     
    6969} 
    7070 
    71 void unixDie(const string& during) 
    72 { 
    73   throw runtime_error("during "+string(during)+": "+strerror(errno)); 
    74 } 
    75  
    76  
    77 void setNonBlocking(int fd) 
    78 { 
    79   int flags=fcntl(fd,F_GETFL,0); 
    80   if(flags<0 || fcntl(fd, F_SETFL,flags|O_NONBLOCK) <0) 
    81     unixDie("Setting filedescriptor to nonblocking failed"); 
    82 } 
    83  
    84 double getTime() 
    85 { 
    86   struct timeval tv; 
    87   gettimeofday(&tv, 0); 
    88   return tv.tv_sec + tv.tv_usec/1000000.0; 
    89 } 
    90  
     71pid_t g_pid; 
     72void waitForOutputCommandToDie() 
     73{ 
     74  int status; 
     75  if(waitpid(g_pid, &status, 0) < 0) 
     76    unixDie("wait on child process"); 
     77   
     78  if(WIFEXITED(status)) 
     79    cerr<<"\nsplitpipe: output command exited with status "<<WEXITSTATUS(status)<<endl; 
     80  else { 
     81    cerr<<"\nsplitpipe: output command exited abnormally"; 
     82    if(WIFSIGNALED(status)) 
     83      cerr<<", by signal "<<WTERMSIG(status); 
     84    cerr<<endl; 
     85  } 
     86} 
     87 
     88 
     89void outputGaveEof(int outputfd) 
     90{ 
     91  cerr<<"\nsplitpipe: output command gave EOF, waiting for it to exit"<<endl; 
     92  close(outputfd); 
     93  waitForOutputCommandToDie(); 
     94  cerr<<"\nsplitpipe: future versions of splitpipe may allow you to continue, but for now.. exit\n"; 
     95  exit(EXIT_FAILURE); 
     96} 
    9197 
    9298void usage() 
     
    150156} 
    151157 
    152 pid_t g_pid; 
     158 
    153159 
    154160int spawnOutputThread() 
     
    196202} 
    197203 
    198 void waitForOutputCommandToDie() 
    199 { 
    200   int status; 
    201   if(waitpid(g_pid, &status, 0) < 0) 
    202     unixDie("wait on child process"); 
    203    
    204   if(WIFEXITED(status)) 
    205     cerr<<"\nsplitpipe: output command exited with status "<<WEXITSTATUS(status)<<endl; 
    206   else { 
    207     cerr<<"\nsplitpipe: output command exited abnormally"; 
    208     if(WIFSIGNALED(status)) 
    209       cerr<<", by signal "<<WTERMSIG(status); 
    210     cerr<<endl; 
    211   } 
    212 } 
    213204 
    214205int main(int argc, char** argv) 
     
    260251  int outputfd; 
    261252  uint64_t amountOutput=0; 
     253  uint16_t leftInStretch=0; 
     254  int numStretches=0; 
    262255 
    263256  char *buffer = new char[parameters.bufferSize]; 
     
    266259    cerr<<"Prebuffering before starting output script.."; 
    267260 
    268   bool d_firstchunk=true; 
     261  bool d_firstchunk=true; // first chunk does not get the 'press enter' stuff 
    269262 
    270263  while(1) { 
     
    278271        cerr<<"splitpipe: reload media, if necessary, and press enter to continue"<<endl; 
    279272        waitForUser(); 
    280  
    281273      } 
    282274 
     
    332324        rb.get(&rbuffer, &lenAvailable); 
    333325 
     326        if(!leftInStretch) { 
     327          leftInStretch=min((size_t)0xffff, lenAvailable); 
     328 
     329          if(parameters.debug)  
     330            cerr<<"splitpipe: starting a stretch of "<<leftInStretch<<" bytes"<<endl; 
     331          uint16_t amount=htons(leftInStretch); 
     332          ret=writen(outputfd, &amount, sizeof(amount),"write of meta-data to output command"); 
     333          if(!ret) 
     334            outputGaveEof(outputfd); 
     335           
     336          numStretches++; 
     337          break; // go past select again, not sure if this is needed 
     338        } 
     339 
    334340        uint64_t leftInChunk=parameters.chunkSize - amountOutput; 
    335341 
    336342        size_t len=min((uint64_t)lenAvailable, leftInChunk); 
    337343         
     344        len=min(len, (size_t)leftInStretch); 
     345 
    338346        if(!len) { 
    339347          cerr<<"\nsplitpipe: output a full chunk, waiting for output command to exit..\n"; 
     
    357365 
    358366        if(!ret) { 
    359           cerr<<"\nsplitpipe: output command gave EOF, waiting for it to exit"<<endl; 
    360           close(outputfd); 
    361           waitForOutputCommandToDie(); 
    362           cerr<<"\nsplitpipe: future versions of splitpipe may allow you to continue, but for now.. exit\n"; 
    363           exit(EXIT_FAILURE); 
     367          outputGaveEof(outputfd); 
    364368        } 
    365369        if(parameters.debug)  
     
    367371        rb.advance(ret); 
    368372 
    369         amountOutput+=ret; 
    370  
    371         totalBytesOutput+=ret; 
     373        amountOutput += ret; 
     374        leftInStretch -= ret; 
     375        totalBytesOutput += ret; 
    372376 
    373377        if(parameters.debug)  
     
    386390    waitForOutputCommandToDie(); 
    387391  } 
     392  cerr<<"splitpipe: output "<<numStretches<<" stretches\n"; 
    388393} 
    389394catch(exception &e)