00001 #ifndef UTILS_RECEIVER_H
00002 #define UTILS_RECEIVER_H
00003
00004 #include <list>
00005
00006 #include <boost/thread/xtime.hpp>
00007 #include <boost/thread/condition.hpp>
00008 #include <boost/thread/thread.hpp>
00009
00010 #define warn nowarn // ignore warnings
00011
00012 using namespace std;
00013
00014 namespace utils {
00015
00017
00024 template <class T, class F, int TIMEOUT = 100>
00025 class receiver: private boost::noncopyable
00026 {
00027 F func;
00028
00029 list<T> queue;
00030
00031 boost::mutex mutex;
00032 boost::condition not_empty;
00033
00034 bool please_quit;
00035 boost::mutex please_quit_mutex;
00036
00037 boost::thread thread;
00038 public:
00040
00045 receiver();
00046
00048
00053 void shutdown();
00054
00056
00063 void add(const T& v);
00064
00065 private:
00066
00067 void checkItems();
00068
00069 friend class Runner;
00070
00071 class Runner
00072 {
00073 public:
00074 receiver& rec;
00075
00076 Runner(receiver& r): rec(r)
00077 { }
00078
00079 void operator()();
00080 };
00081 };
00082
00083 template <class T, class F, int TIMEOUT>
00084 receiver<T,F,TIMEOUT>::receiver(): please_quit(false), thread(Runner(*this))
00085 { }
00086
00087 template <class T, class F, int TIMEOUT>
00088 void receiver<T,F,TIMEOUT>::shutdown()
00089 {
00090 {
00091 warn("please quit.");
00092 boost::mutex::scoped_lock lock(please_quit_mutex);
00093 please_quit = true;
00094 }
00095
00096 thread.join();
00097 }
00098
00099
00100
00101 template <class T, class F, int TIMEOUT>
00102 void receiver<T,F,TIMEOUT>::add(const T& v)
00103 {
00104 warn("adding an item.");
00105 boost::mutex::scoped_lock lock(mutex);
00106 queue.push_back(v);
00107
00108 not_empty.notify_one();
00109 }
00110
00111
00112 template <class T, class F, int TIMEOUT>
00113 void receiver<T,F,TIMEOUT>::checkItems()
00114 {
00115 warn("checkitems");
00116 boost::mutex::scoped_lock lock(mutex);
00117 if(queue.size() == 0)
00118 {
00119 warn("checkitems: waiting for not_empty");
00120 boost::xtime xt;
00121 boost::xtime_get(&xt, boost::TIME_UTC);
00122 xt.sec += TIMEOUT / 1000;
00123 xt.nsec += (TIMEOUT % 1000) * 1000000;
00124 if(xt.nsec >= 1000000000) {
00125 xt.sec += 1;
00126 xt.nsec -= 1000000000;
00127 }
00128
00129 not_empty.timed_wait(lock, xt);
00130 }
00131
00132 int cnt = queue.size();
00133 int i = cnt;
00134 while(i--) {
00135 warn("run func");
00136 func(*queue.begin());
00137 warn("ok");
00138 queue.pop_front();
00139 }
00140
00141 warn("checkitems done: %d items.", cnt);
00142 }
00143
00144 template <class T, class F, int TIMEOUT>
00145 void receiver<T,F,TIMEOUT>::Runner::operator()()
00146 {
00147 while(1)
00148 {
00149 rec.checkItems();
00150
00151 boost::mutex::scoped_lock lock(rec.please_quit_mutex);
00152 if(rec.please_quit)
00153 {
00154 warn("Please quit? -> ok.");
00155 break;
00156 }
00157 }
00158
00159 rec.checkItems();
00160 }
00161
00162 };
00163
00164 #undef warn
00165
00166 #endif