00001 /*---------------------------------------------------------- 00002 BaseDaq.cc 00003 00004 These are the definitions of functions declared in 00005 BaseDaq.hh 00006 For blackbox explanations of these functions, please see 00007 the header file 00008 00009 00010 ---------------------------------------------------------*/ 00011 00012 00013 #include "BaseDaq.hh" 00014 #include "RawEvent.hh" 00015 #include <iostream> 00016 #include <stdexcept> 00017 #include <boost/ref.hpp> 00018 #include "boost/date_time/posix_time/posix_time_duration.hpp" 00019 #include "Message.hh" 00020 00021 bool BaseDaq::_is_constructed = false; 00022 00023 BaseDaq::BaseDaq() throw(std::runtime_error): 00024 _status(NORMAL), _is_running(false) 00025 { 00026 if(_is_constructed){ 00027 //only one instance allowed! 00028 Message(EXCEPTION)<<"BaseDaq constructor called, but only one instance is allowed!"<<std::endl; 00029 throw std::runtime_error("multiple BaseDaq construction"); 00030 } 00031 _is_constructed=true; 00032 _n_queuesize_warnings = 0; 00033 } 00034 00035 BaseDaq::~BaseDaq() 00036 { 00037 //what needs to be deleted here?? 00038 if(_is_running) EndRun(); 00039 _is_constructed=false; 00040 } 00041 00042 00043 int BaseDaq::StartRun() 00044 { 00045 if (_is_running) 00046 { 00047 Message(ERROR)<<"Cannot start a new run without terminating the previous one!"<<std::endl; 00048 return -1; 00049 } 00050 else 00051 _is_running = true; 00052 00053 _n_queuesize_warnings = 0; 00054 while(!_events_queue.empty()) 00055 _events_queue.pop(); 00056 //start new thread and run collect data 00057 Message(DEBUG)<<"Starting daq thread..."<<std::endl; 00058 _daq_thread = boost::thread(boost::ref(*this)); 00059 00060 return 0; 00061 } 00062 00063 int BaseDaq::EndRun(bool force) 00064 { 00065 if (!_is_running) 00066 { 00067 Message(ERROR)<<"Cannot terminate non-existant run."; 00068 return 1; 00069 } 00070 else _is_running = false; 00071 if(force) _daq_thread.interrupt(); 00072 _daq_thread.join(); 00073 /*while(!_events_queue.empty()){ 00074 RawEventPtr next = _events_queue.front(); 00075 next->GetThreadPointer()->join(); 00076 _events_queue.pop(); 00077 }*/ 00078 00079 return 0; 00080 } 00081 00082 RawEventPtr BaseDaq::GetNextEvent(int timeout) 00083 { 00084 if(GetStatus() != NORMAL){ 00085 return RawEventPtr(); 00086 } 00087 //lock the mutex guarding the processed event queue 00088 boost::mutex::scoped_lock lock(_queue_mutex); 00089 if(!_is_running && _events_queue.empty()) 00090 return RawEventPtr(); 00091 //loop until an event is ready 00092 while(_events_queue.empty() ){ 00093 if(timeout < 0) 00094 _event_ready.wait(lock); 00095 else{ 00096 if(!_event_ready.timed_wait(lock,boost::posix_time::microsec(timeout))) 00097 return RawEventPtr(); 00098 } 00099 } 00100 RawEventPtr next(_events_queue.front()); 00101 _events_queue.pop(); 00102 lock.unlock(); 00103 _event_taken.notify_all(); 00104 return next; 00105 } 00106 00107 void BaseDaq::PostEvent(RawEventPtr event) 00108 { 00109 boost::mutex::scoped_lock lock(_queue_mutex); 00110 do{ 00111 if(_events_queue.size() < MAX_QUEUE_SIZE){ 00112 _events_queue.push(event); 00113 _event_ready.notify_all(); 00114 break; 00115 } 00116 //if we get here, the event queue is full 00117 if(_n_queuesize_warnings++ < 1){ 00118 Message(WARNING)<<_events_queue.size()<<" events waiting " 00119 <<"to be processed; trigger rate may be too high.\n" 00120 <<"\tThere will be deadtime in this run.\n"; 00121 } 00122 _event_taken.timed_wait(lock, boost::posix_time::microsec(1000)); 00123 00124 }while(_is_running); 00125 00126 00127 } 00128