00001
00013 #include "V172X_Daq.hh"
00014 #include "V172X_Event.hh"
00015 #include "ConfigHandler.hh"
00016 #include "CommandSwitchFunctions.hh"
00017 #include "EventHandler.hh"
00018 #include "AsyncEventHandler.hh"
00019 #include "RawWriter.hh"
00020 #include "ProcessedPlotter.hh"
00021 #include "Reader.hh"
00022
00023 #include "GenericAnalysis.hh"
00024 #include "BaselineFinder.hh"
00025 #include "PulseFinder.hh"
00026 #include "Integrator.hh"
00027 #include "EvalRois.hh"
00028 #include "SpectrumMaker.hh"
00029 #include "ConvertData.hh"
00030 #include "SumChannels.hh"
00031 #include "TriggerHistory.hh"
00032
00033 #include "runinfo.hh"
00034
00035 #include "kbhit.h"
00036 #include <exception>
00037 #include <string>
00038 #include "Message.hh"
00039 #include <time.h>
00040 #include <fstream>
00041 #include <algorithm>
00042 #include <numeric>
00043 #include "boost/thread/thread.hpp"
00044 bool stop_run = false;
00045
00046
00047 struct PrintStats{
00048 time_t last_print_time;
00049 unsigned long events_processed;
00050 unsigned long long bytes_processed;
00051 void Clear() {
00052 last_print_time = time(0);
00053 events_processed = 0;
00054 bytes_processed = 0;
00055 }
00056 PrintStats() { Clear(); }
00057 };
00058 std::ostream& operator<<(std::ostream& out, const PrintStats& stats){
00059 time_t now = time(0);
00060 out<<"In last "<<now-stats.last_print_time<<" seconds, processed "
00061 <<stats.events_processed<<" events at "
00062 <<stats.bytes_processed/(now-stats.last_print_time)/1024<<" kiB/s";
00063 return out;
00064 }
00065
00066
00067 class SpectrumAdder{
00068 std::vector<SpectrumMaker*>& _spectra;
00069 public:
00070 SpectrumAdder(std::vector<SpectrumMaker*>& v) : _spectra(v) {}
00071 std::istream& operator()(std::istream& in){
00072
00073 std::string name;
00074 in>>name;
00075 _spectra.push_back(new SpectrumMaker(name));
00076
00077 in>>*_spectra.back();
00078 return in;
00079 }
00080 };
00081
00082 class InsertComment{
00083 runinfo* _info;
00084 public:
00085 InsertComment(runinfo* info) : _info(info) {}
00086 int operator()(const char* val){
00087 _info->SetMetadata("comment",std::string(val)) ; return 0;
00088 }
00089 };
00090
00091 int main(int argc, char** argv)
00092 {
00093
00094
00095 ConfigHandler* config = ConfigHandler::GetInstance();
00096
00097
00098 EventHandler* modules = EventHandler::GetInstance();
00099
00100 RawWriter* writer = modules->AddModule<RawWriter>();
00101 ConvertData* read_headers = modules->AddModule<ConvertData>("ReadHeaders");
00102 read_headers->SetHeadersOnly(true);
00103
00104 std::vector<AsyncEventHandler*> async_threads;
00105
00106
00107 AsyncEventHandler thread1;
00108 modules->AddAsyncReceiver(&thread1);
00109 thread1.AddModule(new ConvertData);
00110 thread1.AddModule(new SumChannels);
00111 thread1.AddModule(new BaselineFinder);
00112 thread1.AddModule(new Integrator);
00113 thread1.AddModule(new EvalRois);
00114 async_threads.push_back(&thread1);
00115
00116
00117
00118 AsyncEventHandler thread2;
00119
00120 RootGraphix* rootgraphix = new RootGraphix;
00121 modules->AddModule(rootgraphix, false, true);
00122 thread1.AddReceiver(&thread2);
00123 ProcessedPlotter* plotter = new ProcessedPlotter;
00124 thread2.AddModule(plotter);
00125 thread2.AddModule(new TriggerHistory);
00126 thread2.AddModule(rootgraphix, false);
00127 async_threads.push_back(&thread2);
00128
00129
00130 std::vector<SpectrumMaker*> spectra;
00131 modules->RegisterParameter("spectra",spectra,
00132 "List of live analysis spectra to dispay");
00133
00134
00135
00136
00137 modules->RegisterReadFunction("add_spectrum", SpectrumAdder(spectra),
00138 "Add a new real-time spectrum to display");
00139
00140
00141
00142 long stop_events = -1, stop_time = -1;
00143 long long stop_size = -1;
00144 int stattime=0;
00145 bool write_only = false;
00146 runinfo* info = modules->GetRunInfo();
00147 std::string testmode_file="";
00148 int testmode_dt = 0;
00149 int graphics_refresh = 1;
00150 config->AddCommandSwitch('i', "info", "Set run database info to <info>",
00151 CommandSwitch::DefaultRead<runinfo>(*info),
00152 "info");
00153 config->AddCommandSwitch('m',"message","Set database comment to <message>",
00154 InsertComment(info),"message");
00155
00156 config->AddCommandSwitch('e',"stop_events","Stop after <n> events",
00157 CommandSwitch::DefaultRead<long>(stop_events),
00158 "n");
00159 config->AddCommandSwitch('t',"stop_time","Stop after <n> seconds",
00160 CommandSwitch::DefaultRead<long>(stop_time),
00161 "n");
00162 config->AddCommandSwitch('s',"stop_size","Stop after saving <size> MB",
00163 CommandSwitch::DefaultRead<long long>(stop_size),
00164 "size");
00165 config->AddCommandSwitch(' ',"write-only",
00166 "Disable all modules except for the RawWriter",
00167 CommandSwitch::SetValue<bool>(write_only, true));
00168 config->AddCommandSwitch(' ',"testmode","Fake DAQ from input file <file>",
00169 CommandSwitch::DefaultRead<std::string>
00170 (testmode_file) ,"file");
00171 config->AddCommandSwitch(' ',"testmode-dt","Sleep <N> ms between each event",
00172 CommandSwitch::DefaultRead<int>(testmode_dt),"N");
00173 config->AddCommandSwitch(' ',"stat-time","Print stats every <secs> seconds",
00174 CommandSwitch::DefaultRead<int>(stattime),"secs");
00175 config->AddCommandSwitch(' ',"refresh","Time in s between graphics update",
00176 CommandSwitch::DefaultRead<int>(graphics_refresh),
00177 "secs");
00178 config->RegisterParameter("stop_size",stop_size,
00179 "Maximum file size in bytes before abort the run");
00180 config->RegisterParameter("stop_time",stop_time,
00181 "Maximum time in seconds before we abort the run");
00182 config->RegisterParameter("stop_events",stop_events,
00183 "Maximum number of events before we abort the run");
00184 config->RegisterParameter("stat-time",stattime,
00185 "Time between printing of event/data rates");
00186 config->RegisterReadFunction("require_comment",DeprecatedParameter<bool>());
00187 V172X_Daq daq;
00188
00189 config->SetProgramUsageString("daqman [options]");
00190 config->SetDefaultCfgFile("daqman.cfg");
00191
00192 if(config->ProcessCommandLine(argc, argv))
00193 return 1;
00194
00195 if(config->GetNCommandArgs()){
00196 Message(ERROR)<<"Too many arguments specified."<<std::endl;
00197 config->PrintSwitches(true);
00198 }
00199
00200 AsyncEventHandler thread3;
00201 if(spectra.size() > 0){
00202
00203 thread1.AddReceiver(&thread3);
00204 for(size_t i=0; i<spectra.size(); ++i)
00205 thread3.AddModule((spectra[i]), true, false);
00206 async_threads.push_back(&thread3);
00207 }
00208
00209 if(writer->enabled){
00210 modules->SetRunIDFromFilename(writer->GetFilename());
00211 }
00212
00213
00214 if(write_only){
00215 std::vector<BaseModule*>* mods = modules->GetListOfModules();
00216 for(size_t i=0; i < mods->size(); i++){
00217 if(mods->at(i) != writer) mods->at(i)->enabled = false;
00218 }
00219 }
00220
00221
00222 thread2.SetSleepMillisec(graphics_refresh*1000);
00223
00224
00225 Reader* reader = 0;
00226 if(testmode_file!=""){
00227 reader = new Reader(testmode_file);
00228 if(!reader->IsOk()){
00229 Message(ERROR)<<"Testmode specified, but can't load file "
00230 <<testmode_file<<".\n";
00231 return 1;
00232 }
00233 }
00234
00235
00236 keyboard board;
00237
00238 Message(INFO)<<"Initializing modules...\n";
00239 if(modules->Initialize()){
00240 Message(CRITICAL)<<"Unable to initialize all modules.\n";
00241 return 1;
00242 }
00243
00244 for(size_t i=0; i<async_threads.size(); ++i)
00245 async_threads[i]->StartRunning();
00246 try
00247 {
00248 if(!reader){
00249 Message(INFO)<<"Initializing DAQ...\n";
00250 if(daq.Initialize() == 0){
00251 Message(INFO)<<"Starting Run...\n";
00252 daq.StartRun();
00253 }
00254 else{
00255 Message(CRITICAL)<<"Initialization Error!"<<std::endl;
00256 modules->Finalize();
00257 return -1;
00258 }
00259 }
00260
00261 time_t start_time = time(0);
00262 long events_downloaded = 0;
00263 long long data_downloaded = 0;
00264 PrintStats stats;
00265 while(!stop_run){
00266
00267 if( (stop_time > 0 && time(0)-start_time >= stop_time) ||
00268 (stop_events > 0 && events_downloaded >= stop_events) ||
00269 (stop_size > 0 && writer->GetBytesWritten()/1000000 >= stop_size) ){
00270 stop_run = true;
00271 }
00272
00273 if(board.kbhit()){
00274
00275 char c = board.getch();
00276 switch(c){
00277 case 'q':
00278 case 'Q':
00279 stop_run = true;
00280 break;
00281 case 'p':
00282 case 'P':
00283 plotter->TogglePause();
00284 break;
00285 default:
00286 Message(ERROR)<<"Unknown control character '"<<c<<"'\n";
00287 };
00288 }
00289 if(stop_run) break;
00290
00291 RawEventPtr evt;
00292 if(reader){
00293 evt = reader->GetNextEvent();
00294 if(testmode_dt>0)
00295 boost::this_thread::sleep(boost::posix_time::millisec(testmode_dt));
00296 }
00297 else
00298 evt = daq.GetNextEvent(500000);
00299 if(!evt){
00300
00301 if(reader){
00302 Message(INFO)<<"Reached end of file, exiting testmode.\n";
00303 stop_run = true;
00304 break;
00305 }
00306 else if(daq.GetStatus() != BaseDaq::NORMAL){
00307 Message(ERROR)<<"An error occurred while getting next event.\n";
00308 Message(ERROR)<<"Attempting to abort run...\n";
00309 stop_run = true;
00310 break;
00311 }
00312 else{
00313 Message(DEBUG)<<"Waiting for new event ready in memory...\n";
00314 continue;
00315 }
00316 }
00317 if(modules->Process(evt)){
00318 Message(ERROR)<<"Problem encountered processing event.\n";
00319
00320 continue;
00321 }
00322
00323 data_downloaded += evt->GetDataSize();
00324 events_downloaded++;
00325 stats.events_processed++;
00326 stats.bytes_processed += evt->GetDataSize();
00327 if(stattime>0 && time(0)-stats.last_print_time >= stattime){
00328 Message(INFO)<<stats<<std::endl;
00329 stats.Clear();
00330 }
00331 }
00332 Message(INFO)<<"Ending Run....\n";
00333 if(!reader){
00334 Message(DEBUG)<<"Processing remaining events in queue...\n";
00335 daq.EndRun();
00336 RawEventPtr evt;
00337 while((stop_events<=0 || events_downloaded < stop_events) &&
00338 daq.GetStatus() == BaseDaq::NORMAL &&
00339 (evt = daq.GetNextEvent()) &&
00340 !modules->Process(evt)){
00341 data_downloaded += evt->GetDataSize();
00342 events_downloaded++;
00343 stats.events_processed++;
00344 stats.bytes_processed += evt->GetDataSize();
00345
00346 }
00347 }
00348 time_t delta_time = time(0)-start_time;
00349
00350 for(size_t i=0; i<async_threads.size(); ++i)
00351 async_threads[i]->StopRunning();
00352
00353 modules->Finalize();
00354
00355 Message(INFO)<<events_downloaded<<" events processed.\n";
00356 Message(INFO)<<delta_time<<" seconds elapsed.\n";
00357 if(delta_time>0){
00358 Message(INFO)<<data_downloaded<<" total bytes processed at "
00359 <<data_downloaded/(delta_time)
00360 <<" bytes/s\n";
00361 }
00362 }
00363 catch(std::exception &e)
00364 {
00365 std::cerr<<"Caught Exception: "<<e.what()<<"\n";
00366 }
00367
00368 return daq.GetStatus();
00369 }