Main Page   Namespace List   Class Hierarchy   Compound List   File List   Namespace Members   Compound Members   File Members  

receiver.h

Go to the documentation of this file.
00001 #ifndef UTILS_RECEIVER_H
00002 #define UTILS_RECEIVER_H
00003 
00004 #include <list>
00005 
00006 #include <boost/thread/xtime.hpp>
00007 #include <boost/thread/condition.hpp>
00008 #include <boost/thread/thread.hpp>
00009 
00010 #define warn nowarn             // ignore warnings
00011 
00012 using namespace std;
00013 
00014 namespace utils {
00015 
00017 
00024 template <class T, class F, int TIMEOUT = 100>
00025 class receiver: private boost::noncopyable
00026 {
00027         F func;
00028         
00029         list<T> queue;
00030 
00031         boost::mutex            mutex;
00032     boost::condition    not_empty;
00033 
00034         bool                            please_quit;
00035         boost::mutex            please_quit_mutex;
00036 
00037         boost::thread           thread;
00038 public:
00040 
00045         receiver();
00046         
00048 
00053         void shutdown();
00054 
00056 
00063         void add(const T& v);
00064 
00065 private:        
00066         // proceed next items in the list (if any)
00067         void checkItems();
00068         
00069         friend class Runner;
00070         
00071         class Runner
00072         {
00073         public:
00074                 receiver& rec;
00075         
00076                 Runner(receiver& r): rec(r)
00077                         { }
00078                 
00079                 void operator()();
00080         };
00081 };
00082 
00083 template <class T, class F, int TIMEOUT>
00084 receiver<T,F,TIMEOUT>::receiver(): please_quit(false), thread(Runner(*this))
00085         { }
00086 
00087 template <class T, class F, int TIMEOUT>
00088 void receiver<T,F,TIMEOUT>::shutdown()
00089 { 
00090         {
00091                 warn("please quit.");
00092                 boost::mutex::scoped_lock lock(please_quit_mutex);                      
00093                 please_quit = true;
00094         }
00095 
00096         thread.join();
00097 }
00098 
00099 
00100 // adds an item to the receiver
00101 template <class T, class F, int TIMEOUT>
00102 void receiver<T,F,TIMEOUT>::add(const T& v)
00103 {       
00104         warn("adding an item.");
00105         boost::mutex::scoped_lock lock(mutex);
00106         queue.push_back(v);
00107 
00108         not_empty.notify_one();
00109 }
00110 
00111 // proceed next items in the list (if any)
00112 template <class T, class F, int TIMEOUT>
00113 void receiver<T,F,TIMEOUT>::checkItems()
00114 {
00115         warn("checkitems");
00116         boost::mutex::scoped_lock lock(mutex);
00117         if(queue.size() == 0) 
00118         {
00119                 warn("checkitems: waiting for not_empty");
00120             boost::xtime xt;
00121         boost::xtime_get(&xt, boost::TIME_UTC);
00122             xt.sec += TIMEOUT / 1000;           
00123         xt.nsec += (TIMEOUT % 1000) * 1000000;
00124                 if(xt.nsec >= 1000000000) {
00125                         xt.sec += 1;
00126                         xt.nsec -= 1000000000;
00127                 }
00128 
00129                 not_empty.timed_wait(lock, xt);
00130         }
00131 
00132         int cnt = queue.size();
00133         int i = cnt;
00134         while(i--) {
00135                 warn("run func");
00136                 func(*queue.begin());
00137                 warn("ok");
00138                 queue.pop_front();
00139         }
00140         
00141         warn("checkitems done: %d items.", cnt);
00142 }
00143 
00144 template <class T, class F, int TIMEOUT>
00145 void receiver<T,F,TIMEOUT>::Runner::operator()()
00146 {
00147         while(1)
00148         {
00149                 rec.checkItems();
00150 
00151                 boost::mutex::scoped_lock lock(rec.please_quit_mutex);
00152                 if(rec.please_quit)
00153                 {
00154                         warn("Please quit? -> ok.");
00155                         break;
00156                 }
00157         }
00158 
00159         rec.checkItems();
00160 }
00161 
00162 };  // namespace utils
00163 
00164 #undef warn
00165 
00166 #endif

Generated on Mon Oct 22 17:03:48 2001 for rfc822 by doxygen1.2.11.1 written by Dimitri van Heesch, © 1997-2001