AsyncEventHandler.cc

00001 #include "AsyncEventHandler.hh"
00002 #include "BaseModule.hh"
00003 #include "EventHandler.hh"
00004 
00005 #ifndef SINGLETHREAD
00006 #include "boost/thread.hpp"
00007 #include "boost/thread/mutex.hpp"
00008 #endif
00009 
00010 AsyncEventHandler::AsyncEventHandler() : _running(false), _sleeptime(0), 
00011                                          _blocking(false)
00012 {}
00013 
00014 AsyncEventHandler::~AsyncEventHandler()
00015 {
00016   if(_running){
00017     //this really shouldn't happen, but make sure we end properly...
00018     StopRunning();
00019   }
00020 }
00021 
00022 void AsyncEventHandler::Reset()
00023 {
00024   _modules.clear();
00025   _receivers.clear();
00026 }
00027 
00028 int AsyncEventHandler::AddModule(BaseModule* mod,
00029                                  bool register_to_eventhandler,
00030                                  bool register_parameters)
00031 {
00032   if(mod){
00033     _modules.push_back(mod);
00034     if(register_to_eventhandler) {
00035       EventHandler::GetInstance()->AddModule(mod, false, register_parameters);
00036     }
00037   }
00038   return 0;
00039 }
00040 
00041 int AsyncEventHandler::AddReceiver(AsyncEventHandler* receiver)
00042 {
00043   if(receiver)
00044     _receivers.push_back(receiver);
00045   return _receivers.size();
00046 }
00047 
00048 int AsyncEventHandler::Process(EventPtr evt)
00049 {
00050   //noop if no multithread
00051 #ifdef SINGLETHREAD
00052   Message(WARNING)<<"Attempt to use AsyncEventHandler with multithreading disabled!\n";
00053 #else
00054   if(_running){
00055     boost::mutex::scoped_lock lock(_event_mutex);
00056     while(_blocking && _next_event){
00057       //only go if _next_event is not filled
00058       lock.unlock();
00059       boost::this_thread::sleep(boost::posix_time::millisec(1));
00060       lock.lock();
00061     }
00062     _next_event = evt; 
00063     _event_ready.notify_one();
00064   }
00065 #endif
00066   return 0;
00067 }
00068 
00069 int AsyncEventHandler::StartRunning()
00070 {
00071 #ifndef SINGLETHREAD
00072   //if we have no enabled modules, don't run
00073   int enabled_modules = 0;
00074   for(size_t i=0; i<_modules.size(); ++i){
00075     if(_modules[i]->enabled){
00076       ++enabled_modules;
00077     }
00078   }
00079   if(enabled_modules == 0){
00080     Message(DEBUG)<<"AsyncEventHandler not running: no enabled modules.\n";
00081     return 1;
00082   }
00083   //start a thread
00084   _running = true;
00085   typedef boost::shared_ptr<boost::thread> _tp;
00086   _threadptr = _tp(new boost::thread(boost::ref(*this)));
00087   Message(DEBUG)<<"AsyncEventHandler running on thread "<<_threadptr->get_id()
00088                 <<" with sleeptime "<<_sleeptime<<" ms "
00089                 <<" contains "<<enabled_modules<<" enabled modules.\n";
00090   return 0;
00091 #else
00092   Message(WARNING)<<"Attempt to use AsyncEventHandler with multithreading disabled!\n";
00093   return 1; //only here if singlethread mode compiled
00094 #endif
00095 }
00096 
00097 int AsyncEventHandler::StopRunning()
00098 {
00099   if(!_running)
00100     return 1;
00101   _running = false;
00102 #ifndef SINGLETHREAD
00103   //wake up the thread if it's sleeping
00104   Message(DEBUG)<<"Ending AsyncEventHandler on thread "<<_threadptr->get_id()
00105                 <<"...\n";
00106   Process(EventPtr());
00107   _threadptr->join();
00108 #endif
00109   return 0;
00110   
00111 }
00112 
00113 void AsyncEventHandler::operator()()
00114 {
00115 #ifndef SINGLETHREAD
00116   //need a dummy mutex for condition_variable to work
00117   boost::mutex dummy_mutex;
00118   EventPtr current_event;
00119   while(_running){
00120     boost::mutex::scoped_lock lock(_event_mutex);
00121     if(!_next_event || _next_event == current_event){
00122       _event_ready.timed_wait(lock,boost::posix_time::millisec(1000));
00123       continue;
00124     }
00125     //if we get here, process the event
00126     current_event = _next_event;
00127     if(!_blocking)
00128       lock.unlock();
00129     for(size_t i=0; i<_modules.size(); ++i){
00130       if(_modules[i]->enabled){
00131         _modules[i]->HandleEvent(current_event);
00132         //boost::this_thread::sleep(boost::posix_time::microsec(2) );
00133       }
00134     }
00135     //done processing, hand off to receivers
00136     for(size_t i=0; i<_receivers.size(); ++i){
00137       _receivers[i]->Process(current_event);
00138     }
00139     if(!_blocking){
00140       if(_sleeptime>0){
00141         boost::this_thread::sleep(boost::posix_time::millisec(_sleeptime));
00142       }
00143       else
00144         boost::this_thread::yield();
00145     }
00146     else{
00147       _next_event = EventPtr();
00148     }
00149   }
00150 #endif
00151 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Defines

Generated on 20 Jun 2014 for daqman by  doxygen 1.6.1