EventHandler.cc

00001 #include <stdexcept>
00002 #include <stdlib.h>
00003 #include <algorithm>
00004 #include "EventHandler.hh"
00005 #include "ConfigHandler.hh"
00006 #include "CommandSwitchFunctions.hh"
00007 #include "Message.hh"
00008 #include "BaseModule.hh"
00009 #include "AsyncEventHandler.hh"
00010 #include <stdexcept>
00011 #include <sstream>
00012 
00013 //These functions are used by the ConfigHandler as command switches
00014 class EnableModule{
00015   const bool _enable;
00016 public:
00017   EnableModule(const bool& enable) : _enable(enable) {}
00018   int operator()(const char* modname){
00019     EventHandler* h = EventHandler::GetInstance();
00020     BaseModule* mod = (BaseModule*)(h->GetParameter(modname));
00021     if(!mod) {
00022       Message(ERROR)<<"Unknown module "<<modname<<std::endl;
00023       throw std::invalid_argument("Unknown module");
00024     }
00025     mod->enabled = _enable;
00026     return 0;
00027   }
00028 };
00029 
00030 int ListModules(const char*){
00031   std::cout<<"The available modules are:\n";
00032   const std::vector<BaseModule*>* modules = 
00033     EventHandler::GetInstance()->GetListOfModules();
00034   for(size_t i=0; i< modules->size(); i++){
00035     std::cout<<"\t"<<modules->at(i)->GetName()<<std::endl;
00036   }
00037   exit(0);
00038 }
00039 
00040 //EventHandler member functions
00041 
00042 EventHandler::EventHandler() : 
00043   ParameterList("modules","Takes raw events and delivers it to all enabled modules for processing"), 
00044   _current_event(), _is_initialized(false), run_id(-1)
00045 {
00046   ConfigHandler* config = ConfigHandler::GetInstance();
00047   config->RegisterParameter(this->GetDefaultKey(),*this);
00048   config->RegisterParameter(_runinfo.GetDefaultKey(), _runinfo);
00049   
00050   RegisterReadFunction("access_database", DeprecatedParameter<bool>());
00051   
00052   RegisterParameter("fail_on_bad_cal", _fail_on_bad_cal=false,
00053                     "Fail to initialize if unable to  find calibration data");
00054   RegisterParameter("run_parallel", _run_parallel=false,
00055                     "Do we process modules in series, or give them threads?");
00056   config->AddCommandSwitch(' ',"enable","enable <module>",
00057                            EnableModule(true),"module");
00058   config->AddCommandSwitch(' ',"disable","disable <module>",
00059                            EnableModule(false),"module");
00060   config->AddCommandSwitch(' ',"list-modules","list the available modules",
00061                            ListModules);
00062   /*config->AddCommandSwitch(' ',"no-db","Skip attempts to access database",
00063                            CommandSwitch::SetValue<bool>(_access_database,false)
00064                            ); */
00065   
00066 }
00067 //Copy, assignment constructors not provided
00068 
00069 EventHandler::~EventHandler()
00070 {
00071   if(_is_initialized)
00072     Finalize();
00073   for(size_t i=0; i<_modules.size(); i++){
00074     delete _modules[i];
00075   }
00076   _modules.clear();
00077   
00078 }
00079 
00080 EventHandler* EventHandler::GetInstance()
00081 {
00082   static EventHandler handler;
00083   return &handler;
00084 }
00085 
00086 int EventHandler::AddModule(BaseModule* mod, bool processme, bool registerme)
00087 {
00088   if(!mod)
00089     return -1;
00090   //make sure module is not already considered
00091   if(std::find(_modules.begin(),_modules.end(),mod)==_modules.end()){
00092     _modules.push_back(mod);
00093   }
00094   if(processme)
00095     _processing_modules.push_back(mod);
00096   if(registerme)
00097     RegisterParameter(mod->GetName(), *mod);
00098   return _modules.size();
00099 }
00100 
00101 BaseModule* EventHandler::GetModule(const std::string& modname)
00102 {
00103   for(size_t i=0; i<_modules.size(); ++i){
00104     if(_modules[i]->GetName() == modname)
00105       return _modules[i];
00106   }
00107   //if we get here, module is not known
00108   return 0;
00109 }
00110 
00111 int EventHandler::Initialize()
00112 {
00113   if(_is_initialized) return 1;
00114   Message(DEBUG)<<"EventHandler::Initialize() called with  "<<_modules.size()
00115                 <<" registered modules...\n";
00116   _is_initialized = true;
00117   
00118   //initialize the runinfo
00119   //how to allow overriding runinfo settings?
00120   try{
00121     ConfigHandler::GetInstance()->LoadParameterList(&_runinfo);
00122   }
00123   catch(std::exception& e){
00124     Message(WARNING)<<"Saved runinfo will not be used in this processing!\n";
00125   }
00126   if(_runinfo.runid == -1)
00127     _runinfo.runid = run_id;
00128 
00129   //this info is in the raw file, so reset it:
00130   _runinfo.ResetRunStats();
00131   
00132   //first initialize all enabled modules
00133   std::set<std::string> enabled_modules;
00134   
00135   for(size_t i = 0; i<_modules.size(); i++){
00136     BaseModule* mod = _modules[i];
00137     if(!mod->enabled){
00138       Message(DEBUG)<<"Module "<<mod->GetName()<<" was disabled manually.\n";
00139       continue;
00140     }
00141     //see if all our dependencies are enabled 
00142     const std::set<std::string>* deps = mod->GetDependencies();
00143     std::set<std::string>::const_iterator dep = deps->begin();
00144     for( ; dep != deps->end(); ++dep){
00145       if( enabled_modules.find(*dep) == enabled_modules.end()){
00146         mod->enabled = false;
00147         Message(DEBUG)<<"Module "<<mod->GetName()<<" was disabled by "
00148                       <<"failed dependency "<<*dep<<".\n";
00149         break;
00150       }
00151     }
00152     if(!mod->enabled)
00153       continue;
00154     //if we get here, the module is enabled and ready to initialize
00155     Message(DEBUG)<<"Module "<<mod->GetName()<<" is enabled. Initializing...\n";
00156     enabled_modules.insert(mod->GetName());
00157     if(mod->Initialize()){
00158       Message(CRITICAL)<<"Unable to initialize module "<<mod->GetName()
00159                        <<"; aborting.\n";
00160       _is_initialized = false;
00161       return 1;
00162     }
00163     if(_run_parallel){
00164       //give this module an AsyncEventHandler
00165       AsyncEventHandler* ah = new AsyncEventHandler;
00166       ah->SetBlockingStatus(true);
00167       ah->AddModule(mod,false);
00168       if(_para_handlers.size()==0)
00169         AddAsyncReceiver(ah);
00170       else
00171         _para_handlers.back()->AddReceiver(ah);
00172       ah->StartRunning();
00173       _para_handlers.push_back(ah);
00174     }
00175   }
00176   
00177   return 0;
00178 }
00179 
00180 int EventHandler::Process(RawEventPtr raw)
00181 {
00182   if(!raw){
00183     Message(ERROR)<<"Attempted to process empty event pointer.\n";
00184     return 1;
00185   }
00186   EventPtr evt(new Event(raw));
00187   return Process(evt);
00188 }
00189 
00190 int EventHandler::Process(EventPtr evt)
00191 {  
00192   
00193   if(!_is_initialized) {
00194     Message(ERROR)<<"Attempted to process events before initialization!\n";
00195     throw std::runtime_error("EventHandler::uninitialized process request");
00196     return 1;
00197   }
00198   
00199   int proc_fail = 0;
00200   
00201   _current_event = evt;
00202   //set the run id here
00203   _current_event->GetEventData()->run_id = run_id;
00204   if(!_run_parallel){
00205     std::vector<BaseModule*>::iterator it;
00206     for(it=_processing_modules.begin(); it!=_processing_modules.end(); it++){
00207       BaseModule* mod = *it;
00208       if(mod->enabled){
00209         //Message(DEBUG)<<"Processing module "<<mod->GetName()<<std::endl;
00210         proc_fail += mod->HandleEvent(_current_event);
00211       }
00212     }
00213   }
00214   for(size_t i=0; i < _async_receivers.size(); ++i)
00215     _async_receivers[i]->Process(evt);
00216 
00217   return proc_fail;
00218 }
00219 
00220 int EventHandler::Finalize()
00221 {
00222   if(!_is_initialized) {
00223     Message(WARNING)<<"EventHandler::Finalize() called uninitialized!\n";
00224   }
00225   _is_initialized = false;
00226   //make sure the modules have finished
00227   for(size_t i=0; i<_async_receivers.size(); ++i){
00228     _async_receivers[i]->Process(EventPtr());
00229   }
00230   if(_run_parallel){
00231     for(size_t i=0; i<_para_handlers.size(); ++i){
00232       //this should block until done processing
00233       _para_handlers[i]->Process(EventPtr());
00234       _para_handlers[i]->StopRunning();
00235       delete _para_handlers[i];
00236       //note: assume no one has messed with the receiver list since Initialize
00237       if(i==0)
00238         _async_receivers.pop_back();
00239     }
00240   }
00241   int final_fail = 0;
00242   Message(DEBUG)<<"Finalizing "<<_modules.size()<<" modules..."<<std::endl;
00243   //finalization should go in opposite order of initialization
00244   //but that messes up root file writing, so go in same order...                
00245   std::vector<BaseModule*>::iterator it;
00246   for(it = _modules.begin(); it != _modules.end(); it++){
00247     BaseModule* mod = *it;
00248     if(mod->enabled){
00249       Message(DEBUG)<<"Finalizing module "<<mod->GetName()<<std::endl;
00250       final_fail += mod->Finalize();
00251     }
00252   }
00253   //reset the run info
00254   _runinfo.Init(true);
00255   Message(DEBUG)<<"Done finalizing modules.\n";
00256   if(final_fail)
00257     Message(WARNING)<<"Finalization returned error code "<<final_fail<<"\n";
00258   return final_fail;
00259 }   
00260  
00261 int EventHandler::SetRunIDFromFilename(const std::string& filename)
00262 {
00263   run_id = -1;
00264   //see if the filename matches the pattern *Run######.out.gz
00265   std::string filepart = filename;
00266   //remove any trailing '/'
00267   while(*filepart.rbegin() == '/')
00268     filepart.resize(filepart.size()-1);
00269   size_t basept = filepart.find_last_of('/');
00270   if( basept == std::string::npos) 
00271     basept = 0;
00272   else 
00273     basept++;
00274   filepart = filepart.substr(basept);
00275   size_t runpt = filepart.find("Run");
00276   if( runpt != std::string::npos && filepart.size()>runpt+8){
00277     run_id = atoi(filepart.substr(runpt+3,6).c_str());
00278   }
00279   else{
00280     //might be of the form <name>_yymmddHHMM.<suffix>
00281     size_t underscore = filepart.find_last_of('_');
00282     if(underscore != std::string::npos && filepart.size() > underscore+10){
00283       std::stringstream s(filepart.substr(underscore+1,10));
00284       s >> run_id;
00285     }
00286   }
00287 
00288   Message(INFO)<<"Setting runid to "<<run_id<<" based on filename "<<filename
00289                <<std::endl;
00290  return run_id;
00291 }
00292   
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Defines

Generated on 20 Jun 2014 for daqman by  doxygen 1.6.1