BaseDaq.cc

00001 /*----------------------------------------------------------
00002   BaseDaq.cc
00003   
00004   These are the definitions of functions declared in
00005   BaseDaq.hh
00006   For blackbox explanations of these functions, please see
00007   the header file
00008   
00009 
00010   ---------------------------------------------------------*/
00011 
00012 
00013 #include "BaseDaq.hh"
00014 #include "RawEvent.hh"
00015 #include <iostream>
00016 #include <stdexcept>
00017 #include <boost/ref.hpp>
00018 #include "boost/date_time/posix_time/posix_time_duration.hpp"
00019 #include "Message.hh"
00020 
00021 bool BaseDaq::_is_constructed = false;
00022 
00023 BaseDaq::BaseDaq() throw(std::runtime_error): 
00024   _status(NORMAL), _is_running(false)
00025 {
00026   if(_is_constructed){
00027     //only one instance allowed!
00028     Message(EXCEPTION)<<"BaseDaq constructor called, but only one instance is allowed!"<<std::endl;
00029     throw std::runtime_error("multiple BaseDaq construction");
00030   }
00031   _is_constructed=true;
00032   _n_queuesize_warnings = 0;
00033 }
00034 
00035 BaseDaq::~BaseDaq()
00036 {
00037 //what needs to be deleted here??
00038   if(_is_running) EndRun();
00039   _is_constructed=false;
00040 }
00041 
00042 
00043 int BaseDaq::StartRun()
00044 {
00045   if (_is_running)
00046     {
00047       Message(ERROR)<<"Cannot start a new run without terminating the previous one!"<<std::endl;
00048       return -1;
00049     }
00050   else 
00051     _is_running = true;
00052   
00053   _n_queuesize_warnings = 0;
00054   while(!_events_queue.empty())
00055     _events_queue.pop();
00056   //start new thread and run collect data
00057   Message(DEBUG)<<"Starting daq thread..."<<std::endl;
00058   _daq_thread = boost::thread(boost::ref(*this));
00059   
00060   return 0;
00061 }
00062 
00063 int BaseDaq::EndRun(bool force)
00064 {
00065     if (!_is_running)
00066     {
00067       Message(ERROR)<<"Cannot terminate non-existant run.";
00068       return 1;
00069     }
00070     else _is_running = false;
00071     if(force) _daq_thread.interrupt();
00072     _daq_thread.join();
00073     /*while(!_events_queue.empty()){
00074       RawEventPtr next = _events_queue.front();
00075       next->GetThreadPointer()->join();
00076       _events_queue.pop();
00077       }*/
00078     
00079     return 0;    
00080 }
00081 
00082 RawEventPtr BaseDaq::GetNextEvent(int timeout)
00083 {
00084   if(GetStatus() != NORMAL){
00085     return RawEventPtr();
00086   }
00087   //lock the mutex guarding the processed event queue
00088   boost::mutex::scoped_lock lock(_queue_mutex);
00089   if(!_is_running && _events_queue.empty())
00090     return RawEventPtr();
00091   //loop until an event is ready
00092   while(_events_queue.empty() ){
00093     if(timeout < 0)
00094       _event_ready.wait(lock);
00095     else{
00096       if(!_event_ready.timed_wait(lock,boost::posix_time::microsec(timeout)))
00097         return RawEventPtr();
00098     }
00099   }
00100   RawEventPtr next(_events_queue.front());
00101   _events_queue.pop();
00102   lock.unlock();
00103   _event_taken.notify_all();
00104   return next;
00105 }
00106 
00107 void BaseDaq::PostEvent(RawEventPtr event)
00108 {
00109   boost::mutex::scoped_lock lock(_queue_mutex);
00110   do{
00111     if(_events_queue.size() < MAX_QUEUE_SIZE){
00112       _events_queue.push(event);
00113       _event_ready.notify_all();
00114       break;
00115     }
00116     //if we get here, the event queue is full
00117     if(_n_queuesize_warnings++ < 1){
00118       Message(WARNING)<<_events_queue.size()<<" events waiting "
00119                       <<"to be processed; trigger rate may be too high.\n"
00120                       <<"\tThere will be deadtime in this run.\n";
00121     }
00122     _event_taken.timed_wait(lock, boost::posix_time::microsec(1000));
00123     
00124   }while(_is_running);
00125     
00126   
00127 }
00128 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Defines

Generated on 20 Jun 2014 for daqman by  doxygen 1.6.1