RawWriter.cc

00001 #include "RawWriter.hh"
00002 #include "Message.hh"
00003 #include "ConfigHandler.hh"
00004 #include "CommandSwitchFunctions.hh"
00005 #include "EventHandler.hh"
00006 #include "runinfo.hh"
00007 #include <time.h>
00008 #include <string>
00009 #include <iomanip>
00010 #include <sstream>
00011 #include <sys/stat.h> //needed for mkdir
00012 #include <zlib.h>
00013 
00014 RawWriter::RawWriter() : 
00015   BaseModule(RawWriter::GetDefaultName(),
00016              "Saves the (gzip'ped) raw data from the digitizers to disk"), 
00017   _fout(), _logout(), _log_messenger(0), _ok(true), _bytes_written(0)
00018 {
00019   RegisterParameter("filename",_filename = "",
00020                     "Name of the output file; if it doesn't contain a /, assumed relative to <directory>");
00021   RegisterParameter("directory",_directory=".", 
00022                     "Directory in which to place the output file");
00023   RegisterParameter("create_directory", _create_directory = false,
00024                     "If true, create a new directory under the <directory> path with the base filename");
00025   RegisterParameter("filenamebase", _autonamebase = "rawdaq" ,
00026                     "Base for automatic filenames <base>_yymmddHHMM.###.out");
00027   RegisterParameter("compression", _compression = Z_BEST_SPEED,
00028                     "zip compression level of the event structures");
00029   RegisterParameter("save_config", _save_config = true,
00030                     "Do we save the configuration along with the data?");
00031   RegisterParameter("max_file_size", _max_file_size = 0x80000000, //2 GiB
00032                     "Maximum file size before making a new file");
00033   RegisterParameter("max_event_in_file", _max_event_in_file = 10000 , 
00034                     "Maximum number of events before making a new file");
00035 
00036   ConfigHandler* config = ConfigHandler::GetInstance();
00037   config->AddCommandSwitch('f',"filename", "File for saving raw data",
00038                            CommandSwitch::DefaultRead<std::string>(_filename),
00039                            "file");
00040   config->AddCommandSwitch('d',"directory","Output directory for raw file",
00041                            CommandSwitch::DefaultRead<std::string>(_directory),
00042                            "directory");
00043   config->AddCommandSwitch('c',"compression","Raw data compression level",
00044                            CommandSwitch::DefaultRead<int>(_compression),
00045                            "level");
00046 }
00047 
00048 RawWriter::~RawWriter()
00049 {
00050   if(_fout.is_open())
00051     CloseCurrentFile();
00052 }
00053 
00054 std::string RawWriter::GetDefaultFilename() const
00055 {
00056   //create the default filename
00057   time_t rawtime;
00058   time(&rawtime);
00059   struct tm *timeinfo;
00060   timeinfo = localtime(&rawtime);
00061   char fbuf[100];
00062   strftime(fbuf,100,"%y%m%d%H%M",timeinfo);
00063   return _autonamebase+"_"+fbuf;
00064     
00065 }
00066 
00067 int RawWriter::Initialize()
00068 {
00069   //query user for run metadata
00070   runinfo* info = EventHandler::GetInstance()->GetRunInfo();
00071   if(info){
00072     int ret = info->FillDataForRun(runinfo::RUNSTART);
00073     if(ret){
00074       Message(INFO)<<"User cancelled or error filling run metadata; aborting\n";
00075       return ret;
00076     }
00077   }
00078   
00079   if( _filename == ""){
00080     //was not set manually, so use auto
00081     _filename = GetDefaultFilename();
00082   }
00083   
00084   //see if we need to prepend the directory
00085   if( _filename.find("/") ==std::string::npos && _directory != ""){
00086     std::string temp = _directory;
00087     if(temp[temp.size()-1] != '/')
00088       temp.append("/");
00089     temp.append(_filename);
00090     _filename=temp;
00091   }
00092   //see if we need to remove the .out suffix
00093   if( _filename.rfind(".out") != std::string::npos)
00094     _filename.resize(_filename.size()-4);
00095   
00096   //make a directory before starting the file if requested
00097   if(_create_directory){
00098     std::string dirpart="", filepart=_filename;
00099     size_t slash = _filename.find_last_of('/');
00100     if(slash != std::string::npos){
00101       dirpart = _filename.substr(0,slash+1);
00102       filepart = _filename.substr(slash+1);
00103     }
00104     //filepart _should_ have suffixes removed...
00105     dirpart += filepart;
00106     Message(INFO)<<"Attempting to create directory "<<dirpart<<std::endl;
00107     int err = mkdir(dirpart.c_str(),S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH);
00108     if(err){
00109         Message(ERROR)<<"Unable to create output directory "<<dirpart<<"\n";
00110         return err;
00111     }
00112     _filename = dirpart + "/" + filepart;
00113   }
00114   
00115   //set the run id
00116   _ghead.run_id = EventHandler::GetInstance()->GetRunID();
00117   //set the file index
00118   _ghead.file_index = 0;
00119   //open up the log file
00120   std::string logfilename = _filename+".log";
00121   _logout.open(logfilename.c_str());
00122   if(_logout.is_open()){
00123     _log_messenger = MessageHandler::GetInstance()->
00124       AddMessenger(DEBUG, MessageHandler::PrintToStream(_logout,false) );
00125     Message(DEBUG)<<"Started logging messages to "<<logfilename<<".\n";
00126   }
00127   else{
00128     Message(WARNING)<<"Unable to open logfile "<<logfilename
00129                     <<"; messages will not be logged!\n";
00130   } 
00131   //write the partial config file
00132   if(_save_config)
00133     SaveConfigFile();
00134   return OpenNewFile();
00135 }
00136 
00137 int RawWriter::Process(EventPtr event)
00138 {
00139   if(!_ok){
00140     Message(ERROR)<<"Attempt to write to file in bad state!\n";
00141     return 1;
00142   }
00143   
00144   typedef Reader::datablock_header datablock_header;
00145   //compress all of the datablocks into a separate buffer
00146   //each block has compressed size, including header, uncompressed data size, 
00147   //and type as header
00148   //determine the total size of the output buffer
00149   uint32_t bufsize = sizeof(Reader::event_header);
00150   for(size_t i = 0; i<event->GetRawEvent()->GetNumDataBlocks(); i++){
00151     bufsize += compressBound(event->GetRawEvent()->GetDataBlockSize(i)) + 
00152       sizeof(datablock_header);
00153   }
00154   //zip the data into the buffer
00155   std::vector<char> buf(bufsize);
00156   size_t zipsize=sizeof(Reader::event_header);
00157   for(size_t i = 0;i<event->GetRawEvent()->GetNumDataBlocks(); i++){
00158     //write the data into a space after the header
00159     uLong thistransfer = bufsize-zipsize-sizeof(datablock_header);
00160     int err = 0;
00161     if(_compression == Z_DEFAULT_COMPRESSION){
00162       err = compress((Bytef*)(&buf[zipsize+sizeof(datablock_header)]),
00163                      &thistransfer,
00164                      event->GetRawEvent()->GetRawDataBlock(i),
00165                      event->GetRawEvent()->GetDataBlockSize(i));
00166     }
00167     else{
00168       err = compress2((Bytef*)(&buf[zipsize+sizeof(datablock_header)]),
00169                       &thistransfer,
00170                       event->GetRawEvent()->GetRawDataBlock(i),
00171                       event->GetRawEvent()->GetDataBlockSize(i),
00172                       _compression);
00173     }
00174     if(err != Z_OK){
00175       Message(ERROR)<<"Unable to compress event datablocks in memory\n";
00176       return -1;
00177     }
00178     //write the header
00179     datablock_header* db_head = (datablock_header*)(&buf[zipsize]);
00180     db_head->total_blocksize_disk = sizeof(datablock_header)+thistransfer;
00181     db_head->datasize = event->GetRawEvent()->GetDataBlockSize(i);
00182     db_head->type = event->GetRawEvent()->GetDataBlockType(i);
00183     zipsize += db_head->total_blocksize_disk;
00184     
00185   }
00186   //set values in the event header
00187   Reader::event_header* ehead = (Reader::event_header*)(&buf[0]);
00188   ehead->event_size = zipsize;
00189   ehead->event_id = event->GetRawEvent()->GetID();
00190   ehead->timestamp = event->GetRawEvent()->GetTimestamp();
00191   ehead->nblocks = event->GetRawEvent()->GetNumDataBlocks();
00192 
00193   //see if we need to make a new file
00194   if(_ghead.nevents>=(uint32_t)_max_event_in_file || 
00195      _ghead.file_size + ehead->event_size > _max_file_size){
00196     if( CloseCurrentFile() || OpenNewFile() ){
00197       Message(ERROR)<<"Error occurred when trying to open new file!\n";
00198       return 1;
00199     }
00200   }
00201   
00202   //actually write the event
00203   if(!_fout.write((const char*)(&buf[0]), zipsize)){
00204     Message(ERROR)<<"Error occurred when writing event "<<ehead->event_id
00205                   <<"to disk!\n";
00206     return -1;
00207   }
00208   _bytes_written += ehead->event_size;
00209   //update info for global header
00210   _ghead.nevents++;
00211   if(_ghead.event_id_min > ehead->event_id)
00212     _ghead.event_id_min = ehead->event_id;
00213   _ghead.event_id_max = ehead->event_id;
00214   _ghead.file_size += ehead->event_size;
00215   
00216   return 0;
00217 }
00218 
00219 int RawWriter::Finalize()
00220 {
00221   if(_fout.is_open()){
00222     CloseCurrentFile();
00223     Message(INFO)<<_bytes_written/1024/1024<<" MiB saved to "<<_filename<<"\n";
00224     if(_bytes_written==0){
00225       //Message(WARNING)<<"0 bytes saved; deleting file."<<std::endl;
00226       //char command[40];
00227       //sprintf(command,"rm -f %s",_filename.c_str());
00228       //int result = system(command);
00229       //if(result)
00230       //Message(ERROR)<<"Unable to delete file!\n";
00231     }
00232   }
00233   if(_save_config && _bytes_written > 0){
00234     //query user for run metadata
00235     runinfo* info = EventHandler::GetInstance()->GetRunInfo();
00236     if(info){
00237       int ret = info->FillDataForRun(runinfo::RUNEND);
00238       if(ret){
00239         Message(WARNING)<<"User cancel or error filling end of run metadata!\n";
00240         //return ret;
00241       }
00242     }
00243     SaveConfigFile();
00244   }
00245   if(_logout.is_open()){
00246     Message(DEBUG)<<"Closing logfile.\n";
00247     MessageHandler::GetInstance()->RemoveMessenger(_log_messenger);
00248     _log_messenger = 0;
00249     _logout.close();
00250   }
00251   return 0;
00252 }
00253 
00254 
00255 int RawWriter::OpenNewFile()
00256 {
00257   if(_fout.is_open()){
00258     Message(WARNING)<<"Tried to open new file while current file still open!\n";
00259     CloseCurrentFile();
00260   }
00261   
00262   //set the filename to filename.###.out, where ### is file_index
00263   std::stringstream fname;
00264   fname<<_filename<<"."<<std::setw(3)<<std::setfill('0')
00265        <<_ghead.file_index<<".out";
00266   Message(INFO)<<"Opening file "<<fname.str()<<std::endl;
00267   _fout.open(fname.str().c_str(), std::ios::out|std::ios::binary);
00268   if(!_fout.is_open()){
00269     Message(ERROR)<<"Unable to open file "<<fname.str()<<" for output!\n";
00270     _ok = false;
00271     return 1;
00272   }
00273   //write the "blank" global header 
00274   _ghead.start_time = time(0);
00275   //reset nevents
00276   _ghead.nevents = 0;
00277   //set min event id to max value so we can set it properly during Process
00278   _ghead.event_id_min = 0xFFFFFFFF;
00279   //same for max id
00280   _ghead.event_id_max = 0;
00281   //reset the file_size 
00282   _ghead.file_size = _ghead.global_header_size;
00283   
00284   if(!_fout.write((const char*)(&_ghead), _ghead.global_header_size)){
00285     Message(ERROR)<<"RawWriter: Error writing header to file "<<fname<<"\n";
00286     return 2;
00287   }
00288   
00289   return 0;
00290 }
00291 
00292 int RawWriter::CloseCurrentFile()
00293 {
00294   if(!_fout.is_open())
00295     return 0;
00296   //save the completed global header
00297   _ghead.end_time = time(0);
00298   _fout.seekp(0);
00299   _fout.write((const char*)(&_ghead), _ghead.global_header_size);
00300   _fout.close();
00301   //increment the file_index counter
00302   _ghead.file_index++;
00303   return 0;
00304 }
00305 
00306 void RawWriter::SaveConfigFile()
00307 {
00308   //strip the '.gz' off the end of the file
00309   std::string cfgfile(_filename);
00310   if(_filename.rfind(".gz") != std::string::npos)
00311     cfgfile.resize(cfgfile.size()-3);
00312   //strip off .out
00313   if(_filename.rfind(".out") != std::string::npos)
00314     cfgfile.resize(cfgfile.size()-4);
00315   ConfigHandler::GetInstance()->SaveToFile((cfgfile+".cfg").c_str());
00316   
00317 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Defines

Generated on 20 Jun 2014 for daqman by  doxygen 1.6.1