AsyncEventHandler.cc
00001 #include "AsyncEventHandler.hh"
00002 #include "BaseModule.hh"
00003 #include "EventHandler.hh"
00004
00005 #ifndef SINGLETHREAD
00006 #include "boost/thread.hpp"
00007 #include "boost/thread/mutex.hpp"
00008 #endif
00009
00010 AsyncEventHandler::AsyncEventHandler() : _running(false), _sleeptime(0),
00011 _blocking(false)
00012 {}
00013
00014 AsyncEventHandler::~AsyncEventHandler()
00015 {
00016 if(_running){
00017
00018 StopRunning();
00019 }
00020 }
00021
00022 void AsyncEventHandler::Reset()
00023 {
00024 _modules.clear();
00025 _receivers.clear();
00026 }
00027
00028 int AsyncEventHandler::AddModule(BaseModule* mod,
00029 bool register_to_eventhandler,
00030 bool register_parameters)
00031 {
00032 if(mod){
00033 _modules.push_back(mod);
00034 if(register_to_eventhandler) {
00035 EventHandler::GetInstance()->AddModule(mod, false, register_parameters);
00036 }
00037 }
00038 return 0;
00039 }
00040
00041 int AsyncEventHandler::AddReceiver(AsyncEventHandler* receiver)
00042 {
00043 if(receiver)
00044 _receivers.push_back(receiver);
00045 return _receivers.size();
00046 }
00047
00048 int AsyncEventHandler::Process(EventPtr evt)
00049 {
00050
00051 #ifdef SINGLETHREAD
00052 Message(WARNING)<<"Attempt to use AsyncEventHandler with multithreading disabled!\n";
00053 #else
00054 if(_running){
00055 boost::mutex::scoped_lock lock(_event_mutex);
00056 while(_blocking && _next_event){
00057
00058 lock.unlock();
00059 boost::this_thread::sleep(boost::posix_time::millisec(1));
00060 lock.lock();
00061 }
00062 _next_event = evt;
00063 _event_ready.notify_one();
00064 }
00065 #endif
00066 return 0;
00067 }
00068
00069 int AsyncEventHandler::StartRunning()
00070 {
00071 #ifndef SINGLETHREAD
00072
00073 int enabled_modules = 0;
00074 for(size_t i=0; i<_modules.size(); ++i){
00075 if(_modules[i]->enabled){
00076 ++enabled_modules;
00077 }
00078 }
00079 if(enabled_modules == 0){
00080 Message(DEBUG)<<"AsyncEventHandler not running: no enabled modules.\n";
00081 return 1;
00082 }
00083
00084 _running = true;
00085 typedef boost::shared_ptr<boost::thread> _tp;
00086 _threadptr = _tp(new boost::thread(boost::ref(*this)));
00087 Message(DEBUG)<<"AsyncEventHandler running on thread "<<_threadptr->get_id()
00088 <<" with sleeptime "<<_sleeptime<<" ms "
00089 <<" contains "<<enabled_modules<<" enabled modules.\n";
00090 return 0;
00091 #else
00092 Message(WARNING)<<"Attempt to use AsyncEventHandler with multithreading disabled!\n";
00093 return 1;
00094 #endif
00095 }
00096
00097 int AsyncEventHandler::StopRunning()
00098 {
00099 if(!_running)
00100 return 1;
00101 _running = false;
00102 #ifndef SINGLETHREAD
00103
00104 Message(DEBUG)<<"Ending AsyncEventHandler on thread "<<_threadptr->get_id()
00105 <<"...\n";
00106 Process(EventPtr());
00107 _threadptr->join();
00108 #endif
00109 return 0;
00110
00111 }
00112
00113 void AsyncEventHandler::operator()()
00114 {
00115 #ifndef SINGLETHREAD
00116
00117 boost::mutex dummy_mutex;
00118 EventPtr current_event;
00119 while(_running){
00120 boost::mutex::scoped_lock lock(_event_mutex);
00121 if(!_next_event || _next_event == current_event){
00122 _event_ready.timed_wait(lock,boost::posix_time::millisec(1000));
00123 continue;
00124 }
00125
00126 current_event = _next_event;
00127 if(!_blocking)
00128 lock.unlock();
00129 for(size_t i=0; i<_modules.size(); ++i){
00130 if(_modules[i]->enabled){
00131 _modules[i]->HandleEvent(current_event);
00132
00133 }
00134 }
00135
00136 for(size_t i=0; i<_receivers.size(); ++i){
00137 _receivers[i]->Process(current_event);
00138 }
00139 if(!_blocking){
00140 if(_sleeptime>0){
00141 boost::this_thread::sleep(boost::posix_time::millisec(_sleeptime));
00142 }
00143 else
00144 boost::this_thread::yield();
00145 }
00146 else{
00147 _next_event = EventPtr();
00148 }
00149 }
00150 #endif
00151 }