EventHandler.cc
00001 #include <stdexcept>
00002 #include <stdlib.h>
00003 #include <algorithm>
00004 #include "EventHandler.hh"
00005 #include "ConfigHandler.hh"
00006 #include "CommandSwitchFunctions.hh"
00007 #include "Message.hh"
00008 #include "BaseModule.hh"
00009 #include "AsyncEventHandler.hh"
00010 #include <stdexcept>
00011 #include <sstream>
00012
00013
00014 class EnableModule{
00015 const bool _enable;
00016 public:
00017 EnableModule(const bool& enable) : _enable(enable) {}
00018 int operator()(const char* modname){
00019 EventHandler* h = EventHandler::GetInstance();
00020 BaseModule* mod = (BaseModule*)(h->GetParameter(modname));
00021 if(!mod) {
00022 Message(ERROR)<<"Unknown module "<<modname<<std::endl;
00023 throw std::invalid_argument("Unknown module");
00024 }
00025 mod->enabled = _enable;
00026 return 0;
00027 }
00028 };
00029
00030 int ListModules(const char*){
00031 std::cout<<"The available modules are:\n";
00032 const std::vector<BaseModule*>* modules =
00033 EventHandler::GetInstance()->GetListOfModules();
00034 for(size_t i=0; i< modules->size(); i++){
00035 std::cout<<"\t"<<modules->at(i)->GetName()<<std::endl;
00036 }
00037 exit(0);
00038 }
00039
00040
00041
00042 EventHandler::EventHandler() :
00043 ParameterList("modules","Takes raw events and delivers it to all enabled modules for processing"),
00044 _current_event(), _is_initialized(false), run_id(-1)
00045 {
00046 ConfigHandler* config = ConfigHandler::GetInstance();
00047 config->RegisterParameter(this->GetDefaultKey(),*this);
00048 config->RegisterParameter(_runinfo.GetDefaultKey(), _runinfo);
00049
00050 RegisterReadFunction("access_database", DeprecatedParameter<bool>());
00051
00052 RegisterParameter("fail_on_bad_cal", _fail_on_bad_cal=false,
00053 "Fail to initialize if unable to find calibration data");
00054 RegisterParameter("run_parallel", _run_parallel=false,
00055 "Do we process modules in series, or give them threads?");
00056 config->AddCommandSwitch(' ',"enable","enable <module>",
00057 EnableModule(true),"module");
00058 config->AddCommandSwitch(' ',"disable","disable <module>",
00059 EnableModule(false),"module");
00060 config->AddCommandSwitch(' ',"list-modules","list the available modules",
00061 ListModules);
00062
00063
00064
00065
00066 }
00067
00068
00069 EventHandler::~EventHandler()
00070 {
00071 if(_is_initialized)
00072 Finalize();
00073 for(size_t i=0; i<_modules.size(); i++){
00074 delete _modules[i];
00075 }
00076 _modules.clear();
00077
00078 }
00079
00080 EventHandler* EventHandler::GetInstance()
00081 {
00082 static EventHandler handler;
00083 return &handler;
00084 }
00085
00086 int EventHandler::AddModule(BaseModule* mod, bool processme, bool registerme)
00087 {
00088 if(!mod)
00089 return -1;
00090
00091 if(std::find(_modules.begin(),_modules.end(),mod)==_modules.end()){
00092 _modules.push_back(mod);
00093 }
00094 if(processme)
00095 _processing_modules.push_back(mod);
00096 if(registerme)
00097 RegisterParameter(mod->GetName(), *mod);
00098 return _modules.size();
00099 }
00100
00101 BaseModule* EventHandler::GetModule(const std::string& modname)
00102 {
00103 for(size_t i=0; i<_modules.size(); ++i){
00104 if(_modules[i]->GetName() == modname)
00105 return _modules[i];
00106 }
00107
00108 return 0;
00109 }
00110
00111 int EventHandler::Initialize()
00112 {
00113 if(_is_initialized) return 1;
00114 Message(DEBUG)<<"EventHandler::Initialize() called with "<<_modules.size()
00115 <<" registered modules...\n";
00116 _is_initialized = true;
00117
00118
00119
00120 try{
00121 ConfigHandler::GetInstance()->LoadParameterList(&_runinfo);
00122 }
00123 catch(std::exception& e){
00124 Message(WARNING)<<"Saved runinfo will not be used in this processing!\n";
00125 }
00126 if(_runinfo.runid == -1)
00127 _runinfo.runid = run_id;
00128
00129
00130 _runinfo.ResetRunStats();
00131
00132
00133 std::set<std::string> enabled_modules;
00134
00135 for(size_t i = 0; i<_modules.size(); i++){
00136 BaseModule* mod = _modules[i];
00137 if(!mod->enabled){
00138 Message(DEBUG)<<"Module "<<mod->GetName()<<" was disabled manually.\n";
00139 continue;
00140 }
00141
00142 const std::set<std::string>* deps = mod->GetDependencies();
00143 std::set<std::string>::const_iterator dep = deps->begin();
00144 for( ; dep != deps->end(); ++dep){
00145 if( enabled_modules.find(*dep) == enabled_modules.end()){
00146 mod->enabled = false;
00147 Message(DEBUG)<<"Module "<<mod->GetName()<<" was disabled by "
00148 <<"failed dependency "<<*dep<<".\n";
00149 break;
00150 }
00151 }
00152 if(!mod->enabled)
00153 continue;
00154
00155 Message(DEBUG)<<"Module "<<mod->GetName()<<" is enabled. Initializing...\n";
00156 enabled_modules.insert(mod->GetName());
00157 if(mod->Initialize()){
00158 Message(CRITICAL)<<"Unable to initialize module "<<mod->GetName()
00159 <<"; aborting.\n";
00160 _is_initialized = false;
00161 return 1;
00162 }
00163 if(_run_parallel){
00164
00165 AsyncEventHandler* ah = new AsyncEventHandler;
00166 ah->SetBlockingStatus(true);
00167 ah->AddModule(mod,false);
00168 if(_para_handlers.size()==0)
00169 AddAsyncReceiver(ah);
00170 else
00171 _para_handlers.back()->AddReceiver(ah);
00172 ah->StartRunning();
00173 _para_handlers.push_back(ah);
00174 }
00175 }
00176
00177 return 0;
00178 }
00179
00180 int EventHandler::Process(RawEventPtr raw)
00181 {
00182 if(!raw){
00183 Message(ERROR)<<"Attempted to process empty event pointer.\n";
00184 return 1;
00185 }
00186 EventPtr evt(new Event(raw));
00187 return Process(evt);
00188 }
00189
00190 int EventHandler::Process(EventPtr evt)
00191 {
00192
00193 if(!_is_initialized) {
00194 Message(ERROR)<<"Attempted to process events before initialization!\n";
00195 throw std::runtime_error("EventHandler::uninitialized process request");
00196 return 1;
00197 }
00198
00199 int proc_fail = 0;
00200
00201 _current_event = evt;
00202
00203 _current_event->GetEventData()->run_id = run_id;
00204 if(!_run_parallel){
00205 std::vector<BaseModule*>::iterator it;
00206 for(it=_processing_modules.begin(); it!=_processing_modules.end(); it++){
00207 BaseModule* mod = *it;
00208 if(mod->enabled){
00209
00210 proc_fail += mod->HandleEvent(_current_event);
00211 }
00212 }
00213 }
00214 for(size_t i=0; i < _async_receivers.size(); ++i)
00215 _async_receivers[i]->Process(evt);
00216
00217 return proc_fail;
00218 }
00219
00220 int EventHandler::Finalize()
00221 {
00222 if(!_is_initialized) {
00223 Message(WARNING)<<"EventHandler::Finalize() called uninitialized!\n";
00224 }
00225 _is_initialized = false;
00226
00227 for(size_t i=0; i<_async_receivers.size(); ++i){
00228 _async_receivers[i]->Process(EventPtr());
00229 }
00230 if(_run_parallel){
00231 for(size_t i=0; i<_para_handlers.size(); ++i){
00232
00233 _para_handlers[i]->Process(EventPtr());
00234 _para_handlers[i]->StopRunning();
00235 delete _para_handlers[i];
00236
00237 if(i==0)
00238 _async_receivers.pop_back();
00239 }
00240 }
00241 int final_fail = 0;
00242 Message(DEBUG)<<"Finalizing "<<_modules.size()<<" modules..."<<std::endl;
00243
00244
00245 std::vector<BaseModule*>::iterator it;
00246 for(it = _modules.begin(); it != _modules.end(); it++){
00247 BaseModule* mod = *it;
00248 if(mod->enabled){
00249 Message(DEBUG)<<"Finalizing module "<<mod->GetName()<<std::endl;
00250 final_fail += mod->Finalize();
00251 }
00252 }
00253
00254 _runinfo.Init(true);
00255 Message(DEBUG)<<"Done finalizing modules.\n";
00256 if(final_fail)
00257 Message(WARNING)<<"Finalization returned error code "<<final_fail<<"\n";
00258 return final_fail;
00259 }
00260
00261 int EventHandler::SetRunIDFromFilename(const std::string& filename)
00262 {
00263 run_id = -1;
00264
00265 std::string filepart = filename;
00266
00267 while(*filepart.rbegin() == '/')
00268 filepart.resize(filepart.size()-1);
00269 size_t basept = filepart.find_last_of('/');
00270 if( basept == std::string::npos)
00271 basept = 0;
00272 else
00273 basept++;
00274 filepart = filepart.substr(basept);
00275 size_t runpt = filepart.find("Run");
00276 if( runpt != std::string::npos && filepart.size()>runpt+8){
00277 run_id = atoi(filepart.substr(runpt+3,6).c_str());
00278 }
00279 else{
00280
00281 size_t underscore = filepart.find_last_of('_');
00282 if(underscore != std::string::npos && filepart.size() > underscore+10){
00283 std::stringstream s(filepart.substr(underscore+1,10));
00284 s >> run_id;
00285 }
00286 }
00287
00288 Message(INFO)<<"Setting runid to "<<run_id<<" based on filename "<<filename
00289 <<std::endl;
00290 return run_id;
00291 }
00292