daqman.cc

Go to the documentation of this file.
00001 
00013 #include "V172X_Daq.hh"
00014 #include "V172X_Event.hh"
00015 #include "ConfigHandler.hh"
00016 #include "CommandSwitchFunctions.hh"
00017 #include "EventHandler.hh"
00018 #include "AsyncEventHandler.hh"
00019 #include "RawWriter.hh"
00020 #include "ProcessedPlotter.hh"
00021 #include "Reader.hh"
00022 
00023 #include "GenericAnalysis.hh"
00024 #include "BaselineFinder.hh"
00025 #include "PulseFinder.hh"
00026 #include "Integrator.hh"
00027 #include "EvalRois.hh"
00028 #include "SpectrumMaker.hh"
00029 #include "ConvertData.hh"
00030 #include "SumChannels.hh"
00031 #include "TriggerHistory.hh"
00032 
00033 #include "runinfo.hh"
00034 
00035 #include "kbhit.h"
00036 #include <exception>
00037 #include <string>
00038 #include "Message.hh"
00039 #include <time.h>
00040 #include <fstream>
00041 #include <algorithm>
00042 #include <numeric>
00043 #include "boost/thread/thread.hpp"
00044 bool stop_run = false;
00045 
00046 //utility class to print run statistics
00047 struct PrintStats{
00048   time_t last_print_time;
00049   unsigned long events_processed;
00050   unsigned long long bytes_processed;
00051   void Clear() { 
00052     last_print_time = time(0); 
00053     events_processed = 0;
00054     bytes_processed = 0;
00055   }
00056   PrintStats() { Clear(); }
00057 };
00058 std::ostream& operator<<(std::ostream& out, const PrintStats& stats){
00059   time_t now = time(0);
00060   out<<"In last "<<now-stats.last_print_time<<" seconds, processed "
00061      <<stats.events_processed<<" events at "
00062      <<stats.bytes_processed/(now-stats.last_print_time)/1024<<" kiB/s";
00063   return out;
00064 }
00065 
00066 //utility class to add real-time spectra
00067 class SpectrumAdder{
00068   std::vector<SpectrumMaker*>& _spectra;
00069 public:
00070   SpectrumAdder(std::vector<SpectrumMaker*>& v) : _spectra(v) {}
00071   std::istream& operator()(std::istream& in){
00072     //first parameter is the name of the spectrum module
00073     std::string name;
00074     in>>name;
00075     _spectra.push_back(new SpectrumMaker(name));
00076     //expect to read parameters immediately after name
00077     in>>*_spectra.back();
00078     return in;
00079   }
00080 };
00081   
00082 class InsertComment{
00083   runinfo* _info;
00084 public:
00085   InsertComment(runinfo* info) : _info(info) {}
00086   int operator()(const char* val){ 
00087     _info->SetMetadata("comment",std::string(val)) ; return 0; 
00088   }
00089 };
00090 
00091 int main(int argc, char** argv)
00092 {
00093   
00094   //set up the config handler
00095   ConfigHandler* config = ConfigHandler::GetInstance();
00096   
00097   //register all processing modules
00098   EventHandler* modules = EventHandler::GetInstance();
00099   //only RawWriter and reading headers go synchronously 
00100   RawWriter* writer = modules->AddModule<RawWriter>();
00101   ConvertData* read_headers = modules->AddModule<ConvertData>("ReadHeaders");
00102   read_headers->SetHeadersOnly(true);
00103   
00104   std::vector<AsyncEventHandler*> async_threads;
00105   
00106   //All analyzing modules run on a single asynchronous thread
00107   AsyncEventHandler thread1;
00108   modules->AddAsyncReceiver(&thread1);
00109   thread1.AddModule(new ConvertData);
00110   thread1.AddModule(new SumChannels);
00111   thread1.AddModule(new BaselineFinder);
00112   thread1.AddModule(new Integrator);
00113   thread1.AddModule(new EvalRois);
00114   async_threads.push_back(&thread1);
00115   // any more advanced functions?
00116   
00117   //ProcessedPlotter and RootGraphix share a thread and refresh speed
00118   AsyncEventHandler thread2;
00119   //add RootGraphix first so dependencies pass, but call afterward...
00120   RootGraphix* rootgraphix = new RootGraphix;
00121   modules->AddModule(rootgraphix, false, true);
00122   thread1.AddReceiver(&thread2);
00123   ProcessedPlotter* plotter = new ProcessedPlotter;
00124   thread2.AddModule(plotter);
00125   thread2.AddModule(new TriggerHistory);
00126   thread2.AddModule(rootgraphix, false);
00127   async_threads.push_back(&thread2);
00128   
00129   //allow as many spectra as we want
00130   std::vector<SpectrumMaker*> spectra;
00131   modules->RegisterParameter("spectra",spectra, 
00132                              "List of live analysis spectra to dispay");
00133   
00134   
00135   
00136   //keep this for backward compatibility
00137   modules->RegisterReadFunction("add_spectrum", SpectrumAdder(spectra),
00138                                 "Add a new real-time spectrum to display");
00139 
00140     
00141   //initialize some options for command switches
00142   long stop_events = -1, stop_time = -1; 
00143   long long stop_size = -1;
00144   int stattime=0;
00145   bool write_only = false;
00146   runinfo* info = modules->GetRunInfo();
00147   std::string testmode_file="";
00148   int testmode_dt = 0;
00149   int graphics_refresh = 1;
00150   config->AddCommandSwitch('i', "info", "Set run database info to <info>",
00151                            CommandSwitch::DefaultRead<runinfo>(*info),
00152                            "info");
00153   config->AddCommandSwitch('m',"message","Set database comment to <message>",
00154                            InsertComment(info),"message");
00155                            
00156   config->AddCommandSwitch('e',"stop_events","Stop after <n> events",
00157                            CommandSwitch::DefaultRead<long>(stop_events),
00158                            "n");
00159   config->AddCommandSwitch('t',"stop_time","Stop after <n> seconds",
00160                            CommandSwitch::DefaultRead<long>(stop_time),
00161                            "n");
00162   config->AddCommandSwitch('s',"stop_size","Stop after saving <size> MB",
00163                            CommandSwitch::DefaultRead<long long>(stop_size),
00164                            "size");
00165   config->AddCommandSwitch(' ',"write-only",
00166                            "Disable all modules except for the RawWriter",
00167                            CommandSwitch::SetValue<bool>(write_only, true));
00168   config->AddCommandSwitch(' ',"testmode","Fake DAQ from input file <file>",
00169                            CommandSwitch::DefaultRead<std::string>
00170                            (testmode_file) ,"file");
00171   config->AddCommandSwitch(' ',"testmode-dt","Sleep <N> ms between each event",
00172                            CommandSwitch::DefaultRead<int>(testmode_dt),"N");
00173   config->AddCommandSwitch(' ',"stat-time","Print stats every <secs> seconds",
00174                            CommandSwitch::DefaultRead<int>(stattime),"secs");
00175   config->AddCommandSwitch(' ',"refresh","Time in s between graphics update",
00176                            CommandSwitch::DefaultRead<int>(graphics_refresh),
00177                            "secs");
00178   config->RegisterParameter("stop_size",stop_size,
00179                             "Maximum file size in bytes before abort the run");
00180   config->RegisterParameter("stop_time",stop_time,
00181                             "Maximum time in seconds before we abort the run");
00182   config->RegisterParameter("stop_events",stop_events,
00183                             "Maximum number of events before we abort the run");
00184   config->RegisterParameter("stat-time",stattime,
00185                             "Time between printing of event/data rates");
00186   config->RegisterReadFunction("require_comment",DeprecatedParameter<bool>());
00187   V172X_Daq daq;
00188     
00189   config->SetProgramUsageString("daqman [options]");
00190   config->SetDefaultCfgFile("daqman.cfg");
00191   
00192   if(config->ProcessCommandLine(argc, argv))
00193     return 1;
00194   
00195   if(config->GetNCommandArgs()){
00196     Message(ERROR)<<"Too many arguments specified."<<std::endl;
00197     config->PrintSwitches(true);
00198   }
00199   
00200   AsyncEventHandler thread3;
00201   if(spectra.size() > 0){
00202     //for right now, put the spectra all on one thread
00203     thread1.AddReceiver(&thread3);
00204     for(size_t i=0; i<spectra.size(); ++i)
00205       thread3.AddModule((spectra[i]), true, false);
00206     async_threads.push_back(&thread3);
00207   }
00208   
00209   if(writer->enabled){
00210     modules->SetRunIDFromFilename(writer->GetFilename());
00211   }
00212   
00213   //see if we want to disable everything
00214   if(write_only){
00215     std::vector<BaseModule*>* mods = modules->GetListOfModules();
00216     for(size_t i=0; i < mods->size(); i++){
00217       if(mods->at(i) != writer) mods->at(i)->enabled = false;
00218     }
00219   }
00220   
00221   //set the graphics refresh time
00222   thread2.SetSleepMillisec(graphics_refresh*1000);
00223   
00224   //see if we're in test mode
00225   Reader* reader = 0;
00226   if(testmode_file!=""){
00227     reader = new Reader(testmode_file);
00228     if(!reader->IsOk()){
00229       Message(ERROR)<<"Testmode specified, but can't load file "
00230                     <<testmode_file<<".\n";
00231       return 1;
00232     }
00233   }
00234   
00235   //set the terminal into unbuffered mode
00236   keyboard board;
00237   //initialize all modules
00238   Message(INFO)<<"Initializing modules...\n";
00239   if(modules->Initialize()){
00240     Message(CRITICAL)<<"Unable to initialize all modules.\n";
00241     return 1;
00242   }
00243   //start up the asynchronous event handlers
00244   for(size_t i=0; i<async_threads.size(); ++i)
00245     async_threads[i]->StartRunning();
00246   try
00247     {
00248       if(!reader){
00249         Message(INFO)<<"Initializing DAQ...\n";
00250         if(daq.Initialize() == 0){
00251           Message(INFO)<<"Starting Run...\n";
00252           daq.StartRun();
00253         }
00254         else{
00255           Message(CRITICAL)<<"Initialization Error!"<<std::endl;
00256           modules->Finalize();
00257           return -1;
00258         }
00259       }
00260       //start the thread which takes commands
00261       time_t start_time = time(0);
00262       long events_downloaded = 0;
00263       long long data_downloaded = 0;
00264       PrintStats stats;
00265       while(!stop_run){
00266         //see if we should end the run
00267         if( (stop_time > 0 && time(0)-start_time >= stop_time) ||
00268             (stop_events > 0 && events_downloaded >= stop_events) ||
00269             (stop_size > 0 && writer->GetBytesWritten()/1000000 >= stop_size) ){
00270           stop_run = true;
00271         }
00272         //process user input
00273         if(board.kbhit()){
00274           //process user entered command
00275           char c = board.getch();
00276           switch(c){
00277           case 'q':
00278           case 'Q':
00279             stop_run = true;
00280             break;
00281           case 'p':
00282           case 'P':
00283             plotter->TogglePause();
00284             break;
00285           default:
00286             Message(ERROR)<<"Unknown control character '"<<c<<"'\n";
00287           };
00288         }
00289         if(stop_run) break;
00290         //get the next event
00291         RawEventPtr evt;
00292         if(reader){
00293           evt = reader->GetNextEvent();
00294           if(testmode_dt>0)
00295             boost::this_thread::sleep(boost::posix_time::millisec(testmode_dt));
00296         }
00297         else
00298           evt = daq.GetNextEvent(500000);
00299         if(!evt){
00300           //see if there's an error
00301           if(reader){
00302             Message(INFO)<<"Reached end of file, exiting testmode.\n";
00303             stop_run = true;
00304             break;
00305           }
00306           else if(daq.GetStatus() != BaseDaq::NORMAL){
00307             Message(ERROR)<<"An error occurred while getting next event.\n";
00308             Message(ERROR)<<"Attempting to abort run...\n";
00309             stop_run = true;
00310             break;
00311           }
00312           else{
00313             Message(DEBUG)<<"Waiting for new event ready in memory...\n";
00314             continue;
00315           }
00316         }
00317         if(modules->Process(evt)){
00318           Message(ERROR)<<"Problem encountered processing event.\n";
00319           //stop_run = true;
00320           continue;
00321         }
00322 
00323         data_downloaded += evt->GetDataSize();
00324         events_downloaded++;
00325         stats.events_processed++;
00326         stats.bytes_processed += evt->GetDataSize();
00327         if(stattime>0 && time(0)-stats.last_print_time >= stattime){
00328           Message(INFO)<<stats<<std::endl;
00329           stats.Clear();
00330         }
00331       }
00332       Message(INFO)<<"Ending Run....\n";
00333       if(!reader){
00334         Message(DEBUG)<<"Processing remaining events in queue...\n";
00335         daq.EndRun();
00336         RawEventPtr evt;
00337         while((stop_events<=0 || events_downloaded < stop_events) &&
00338               daq.GetStatus() == BaseDaq::NORMAL &&
00339               (evt = daq.GetNextEvent()) && 
00340               !modules->Process(evt)){
00341           data_downloaded += evt->GetDataSize();
00342           events_downloaded++;
00343           stats.events_processed++;
00344           stats.bytes_processed += evt->GetDataSize();
00345           
00346         }
00347       }
00348       time_t delta_time = time(0)-start_time;
00349       //end the asyncronous threads
00350       for(size_t i=0; i<async_threads.size(); ++i)
00351         async_threads[i]->StopRunning();
00352       
00353       modules->Finalize();
00354       //print out some statistics
00355       Message(INFO)<<events_downloaded<<" events processed.\n";
00356       Message(INFO)<<delta_time<<" seconds elapsed.\n";
00357       if(delta_time>0){
00358         Message(INFO)<<data_downloaded<<" total bytes processed at "
00359                      <<data_downloaded/(delta_time)
00360                      <<" bytes/s\n";
00361       }
00362     }
00363   catch(std::exception &e)
00364     {
00365       std::cerr<<"Caught Exception: "<<e.what()<<"\n";
00366     }
00367   
00368   return daq.GetStatus();
00369 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Defines

Generated on 20 Jun 2014 for daqman by  doxygen 1.6.1