00001 #include "RawWriter.hh"
00002 #include "Message.hh"
00003 #include "ConfigHandler.hh"
00004 #include "CommandSwitchFunctions.hh"
00005 #include "EventHandler.hh"
00006 #include "runinfo.hh"
00007 #include <time.h>
00008 #include <string>
00009 #include <iomanip>
00010 #include <sstream>
00011 #include <sys/stat.h>
00012 #include <zlib.h>
00013
00014 RawWriter::RawWriter() :
00015 BaseModule(RawWriter::GetDefaultName(),
00016 "Saves the (gzip'ped) raw data from the digitizers to disk"),
00017 _fout(), _logout(), _log_messenger(0), _ok(true), _bytes_written(0)
00018 {
00019 RegisterParameter("filename",_filename = "",
00020 "Name of the output file; if it doesn't contain a /, assumed relative to <directory>");
00021 RegisterParameter("directory",_directory=".",
00022 "Directory in which to place the output file");
00023 RegisterParameter("create_directory", _create_directory = false,
00024 "If true, create a new directory under the <directory> path with the base filename");
00025 RegisterParameter("filenamebase", _autonamebase = "rawdaq" ,
00026 "Base for automatic filenames <base>_yymmddHHMM.###.out");
00027 RegisterParameter("compression", _compression = Z_BEST_SPEED,
00028 "zip compression level of the event structures");
00029 RegisterParameter("save_config", _save_config = true,
00030 "Do we save the configuration along with the data?");
00031 RegisterParameter("max_file_size", _max_file_size = 0x80000000,
00032 "Maximum file size before making a new file");
00033 RegisterParameter("max_event_in_file", _max_event_in_file = 10000 ,
00034 "Maximum number of events before making a new file");
00035
00036 ConfigHandler* config = ConfigHandler::GetInstance();
00037 config->AddCommandSwitch('f',"filename", "File for saving raw data",
00038 CommandSwitch::DefaultRead<std::string>(_filename),
00039 "file");
00040 config->AddCommandSwitch('d',"directory","Output directory for raw file",
00041 CommandSwitch::DefaultRead<std::string>(_directory),
00042 "directory");
00043 config->AddCommandSwitch('c',"compression","Raw data compression level",
00044 CommandSwitch::DefaultRead<int>(_compression),
00045 "level");
00046 }
00047
00048 RawWriter::~RawWriter()
00049 {
00050 if(_fout.is_open())
00051 CloseCurrentFile();
00052 }
00053
00054 std::string RawWriter::GetDefaultFilename() const
00055 {
00056
00057 time_t rawtime;
00058 time(&rawtime);
00059 struct tm *timeinfo;
00060 timeinfo = localtime(&rawtime);
00061 char fbuf[100];
00062 strftime(fbuf,100,"%y%m%d%H%M",timeinfo);
00063 return _autonamebase+"_"+fbuf;
00064
00065 }
00066
00067 int RawWriter::Initialize()
00068 {
00069
00070 runinfo* info = EventHandler::GetInstance()->GetRunInfo();
00071 if(info){
00072 int ret = info->FillDataForRun(runinfo::RUNSTART);
00073 if(ret){
00074 Message(INFO)<<"User cancelled or error filling run metadata; aborting\n";
00075 return ret;
00076 }
00077 }
00078
00079 if( _filename == ""){
00080
00081 _filename = GetDefaultFilename();
00082 }
00083
00084
00085 if( _filename.find("/") ==std::string::npos && _directory != ""){
00086 std::string temp = _directory;
00087 if(temp[temp.size()-1] != '/')
00088 temp.append("/");
00089 temp.append(_filename);
00090 _filename=temp;
00091 }
00092
00093 if( _filename.rfind(".out") != std::string::npos)
00094 _filename.resize(_filename.size()-4);
00095
00096
00097 if(_create_directory){
00098 std::string dirpart="", filepart=_filename;
00099 size_t slash = _filename.find_last_of('/');
00100 if(slash != std::string::npos){
00101 dirpart = _filename.substr(0,slash+1);
00102 filepart = _filename.substr(slash+1);
00103 }
00104
00105 dirpart += filepart;
00106 Message(INFO)<<"Attempting to create directory "<<dirpart<<std::endl;
00107 int err = mkdir(dirpart.c_str(),S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH);
00108 if(err){
00109 Message(ERROR)<<"Unable to create output directory "<<dirpart<<"\n";
00110 return err;
00111 }
00112 _filename = dirpart + "/" + filepart;
00113 }
00114
00115
00116 _ghead.run_id = EventHandler::GetInstance()->GetRunID();
00117
00118 _ghead.file_index = 0;
00119
00120 std::string logfilename = _filename+".log";
00121 _logout.open(logfilename.c_str());
00122 if(_logout.is_open()){
00123 _log_messenger = MessageHandler::GetInstance()->
00124 AddMessenger(DEBUG, MessageHandler::PrintToStream(_logout,false) );
00125 Message(DEBUG)<<"Started logging messages to "<<logfilename<<".\n";
00126 }
00127 else{
00128 Message(WARNING)<<"Unable to open logfile "<<logfilename
00129 <<"; messages will not be logged!\n";
00130 }
00131
00132 if(_save_config)
00133 SaveConfigFile();
00134 return OpenNewFile();
00135 }
00136
00137 int RawWriter::Process(EventPtr event)
00138 {
00139 if(!_ok){
00140 Message(ERROR)<<"Attempt to write to file in bad state!\n";
00141 return 1;
00142 }
00143
00144 typedef Reader::datablock_header datablock_header;
00145
00146
00147
00148
00149 uint32_t bufsize = sizeof(Reader::event_header);
00150 for(size_t i = 0; i<event->GetRawEvent()->GetNumDataBlocks(); i++){
00151 bufsize += compressBound(event->GetRawEvent()->GetDataBlockSize(i)) +
00152 sizeof(datablock_header);
00153 }
00154
00155 std::vector<char> buf(bufsize);
00156 size_t zipsize=sizeof(Reader::event_header);
00157 for(size_t i = 0;i<event->GetRawEvent()->GetNumDataBlocks(); i++){
00158
00159 uLong thistransfer = bufsize-zipsize-sizeof(datablock_header);
00160 int err = 0;
00161 if(_compression == Z_DEFAULT_COMPRESSION){
00162 err = compress((Bytef*)(&buf[zipsize+sizeof(datablock_header)]),
00163 &thistransfer,
00164 event->GetRawEvent()->GetRawDataBlock(i),
00165 event->GetRawEvent()->GetDataBlockSize(i));
00166 }
00167 else{
00168 err = compress2((Bytef*)(&buf[zipsize+sizeof(datablock_header)]),
00169 &thistransfer,
00170 event->GetRawEvent()->GetRawDataBlock(i),
00171 event->GetRawEvent()->GetDataBlockSize(i),
00172 _compression);
00173 }
00174 if(err != Z_OK){
00175 Message(ERROR)<<"Unable to compress event datablocks in memory\n";
00176 return -1;
00177 }
00178
00179 datablock_header* db_head = (datablock_header*)(&buf[zipsize]);
00180 db_head->total_blocksize_disk = sizeof(datablock_header)+thistransfer;
00181 db_head->datasize = event->GetRawEvent()->GetDataBlockSize(i);
00182 db_head->type = event->GetRawEvent()->GetDataBlockType(i);
00183 zipsize += db_head->total_blocksize_disk;
00184
00185 }
00186
00187 Reader::event_header* ehead = (Reader::event_header*)(&buf[0]);
00188 ehead->event_size = zipsize;
00189 ehead->event_id = event->GetRawEvent()->GetID();
00190 ehead->timestamp = event->GetRawEvent()->GetTimestamp();
00191 ehead->nblocks = event->GetRawEvent()->GetNumDataBlocks();
00192
00193
00194 if(_ghead.nevents>=(uint32_t)_max_event_in_file ||
00195 _ghead.file_size + ehead->event_size > _max_file_size){
00196 if( CloseCurrentFile() || OpenNewFile() ){
00197 Message(ERROR)<<"Error occurred when trying to open new file!\n";
00198 return 1;
00199 }
00200 }
00201
00202
00203 if(!_fout.write((const char*)(&buf[0]), zipsize)){
00204 Message(ERROR)<<"Error occurred when writing event "<<ehead->event_id
00205 <<"to disk!\n";
00206 return -1;
00207 }
00208 _bytes_written += ehead->event_size;
00209
00210 _ghead.nevents++;
00211 if(_ghead.event_id_min > ehead->event_id)
00212 _ghead.event_id_min = ehead->event_id;
00213 _ghead.event_id_max = ehead->event_id;
00214 _ghead.file_size += ehead->event_size;
00215
00216 return 0;
00217 }
00218
00219 int RawWriter::Finalize()
00220 {
00221 if(_fout.is_open()){
00222 CloseCurrentFile();
00223 Message(INFO)<<_bytes_written/1024/1024<<" MiB saved to "<<_filename<<"\n";
00224 if(_bytes_written==0){
00225
00226
00227
00228
00229
00230
00231 }
00232 }
00233 if(_save_config && _bytes_written > 0){
00234
00235 runinfo* info = EventHandler::GetInstance()->GetRunInfo();
00236 if(info){
00237 int ret = info->FillDataForRun(runinfo::RUNEND);
00238 if(ret){
00239 Message(WARNING)<<"User cancel or error filling end of run metadata!\n";
00240
00241 }
00242 }
00243 SaveConfigFile();
00244 }
00245 if(_logout.is_open()){
00246 Message(DEBUG)<<"Closing logfile.\n";
00247 MessageHandler::GetInstance()->RemoveMessenger(_log_messenger);
00248 _log_messenger = 0;
00249 _logout.close();
00250 }
00251 return 0;
00252 }
00253
00254
00255 int RawWriter::OpenNewFile()
00256 {
00257 if(_fout.is_open()){
00258 Message(WARNING)<<"Tried to open new file while current file still open!\n";
00259 CloseCurrentFile();
00260 }
00261
00262
00263 std::stringstream fname;
00264 fname<<_filename<<"."<<std::setw(3)<<std::setfill('0')
00265 <<_ghead.file_index<<".out";
00266 Message(INFO)<<"Opening file "<<fname.str()<<std::endl;
00267 _fout.open(fname.str().c_str(), std::ios::out|std::ios::binary);
00268 if(!_fout.is_open()){
00269 Message(ERROR)<<"Unable to open file "<<fname.str()<<" for output!\n";
00270 _ok = false;
00271 return 1;
00272 }
00273
00274 _ghead.start_time = time(0);
00275
00276 _ghead.nevents = 0;
00277
00278 _ghead.event_id_min = 0xFFFFFFFF;
00279
00280 _ghead.event_id_max = 0;
00281
00282 _ghead.file_size = _ghead.global_header_size;
00283
00284 if(!_fout.write((const char*)(&_ghead), _ghead.global_header_size)){
00285 Message(ERROR)<<"RawWriter: Error writing header to file "<<fname<<"\n";
00286 return 2;
00287 }
00288
00289 return 0;
00290 }
00291
00292 int RawWriter::CloseCurrentFile()
00293 {
00294 if(!_fout.is_open())
00295 return 0;
00296
00297 _ghead.end_time = time(0);
00298 _fout.seekp(0);
00299 _fout.write((const char*)(&_ghead), _ghead.global_header_size);
00300 _fout.close();
00301
00302 _ghead.file_index++;
00303 return 0;
00304 }
00305
00306 void RawWriter::SaveConfigFile()
00307 {
00308
00309 std::string cfgfile(_filename);
00310 if(_filename.rfind(".gz") != std::string::npos)
00311 cfgfile.resize(cfgfile.size()-3);
00312
00313 if(_filename.rfind(".out") != std::string::npos)
00314 cfgfile.resize(cfgfile.size()-4);
00315 ConfigHandler::GetInstance()->SaveToFile((cfgfile+".cfg").c_str());
00316
00317 }