Overview Tutorial API reference Examples Build Download | ZmqMessage 0.1 - 21 Oct 2011 |
Characteristics of task processing:
Execution thread behavior is following:
00001 00011 #include "pthread.h" 00012 #include "stdint.h" 00013 00014 #include <vector> 00015 #include "ZmqMessage.hpp" 00016 00047 //0mq context is globally available 00048 zmq::context_t ctx(1); 00049 00050 // endpoints to pass data from main thread to worker and back 00051 const char* req_endpoint = "inproc://req_ep"; 00052 const char* res_endpoint = "inproc://res_ep"; 00053 00054 // endpoint to publish stop signal, we shouldn't miss it! 00055 const char* stop_endpoint = "inproc://stop"; 00056 00057 const char* to_worker_fields[] = {"message_type", "task_identifier"}; 00058 const char* from_worker_fields[] = {"message_type", "task_identifier"}; 00059 00060 uint64_t message_queue_limit = 5; 00061 int max_task_steps = 10; 00062 00063 //we emulate asynchronous tasks by decrementing counter on each poll timeout event 00064 struct async_task 00065 { 00066 int id; 00067 int remaining_steps; 00068 }; 00069 00070 typedef std::vector<async_task> TaskVec; 00071 00072 void task_step(async_task& task) 00073 { 00074 --task.remaining_steps; 00075 } 00076 00077 template <typename QueueInserter> 00078 void run_tasks(TaskVec& tasks, zmq::socket_t& s_res, QueueInserter q) 00079 { 00080 std::for_each(tasks.begin(), tasks.end(), &task_step); 00081 for (TaskVec::iterator it = tasks.begin(); it != tasks.end(); ) 00082 { 00083 async_task& task = *it; 00084 if (task.remaining_steps == 0) 00085 { 00086 std::cout << " task " << task.id << " done" << std::endl; 00087 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress( 00088 ZmqMessage::OutOptions( 00089 s_res, ZmqMessage::OutOptions::CACHE_ON_BLOCK | 00090 ZmqMessage::OutOptions::NONBLOCK)); 00091 00092 egress << "finished" << task.id << ZmqMessage::Flush; 00093 00094 if (egress.is_queued()) 00095 { 00096 std::cout << " is_queued" << std::endl; 00097 *q = egress.detach(); 00098 } 00099 it = tasks.erase(it); 00100 } 00101 else 00102 { 00103 ++it; 00104 } 00105 } 00106 } 00107 00108 // async task processor, runs in a dedicated thread 00109 //Receives 'tasks' designated by id. Each task will take random (1-max_task_steps) steps to complete. 00110 //so 'finished' responses will come in arbitrary order 00111 void* 00112 async_task_processor(void*) 00113 { 00114 try 00115 { 00116 zmq::socket_t s_req(ctx, ZMQ_PULL); 00117 zmq::socket_t s_res(ctx, ZMQ_PUSH); 00118 uint64_t one_lim = 1; 00119 s_req.setsockopt(ZMQ_HWM, &one_lim, sizeof(uint64_t)); 00120 s_res.setsockopt(ZMQ_HWM, &one_lim, sizeof(uint64_t)); 00121 00122 s_req.connect(req_endpoint); 00123 s_res.connect(res_endpoint); 00124 00125 zmq::socket_t ss(ctx, ZMQ_SUB); 00126 ss.setsockopt (ZMQ_SUBSCRIBE, "", 0); 00127 ss.connect(stop_endpoint); 00128 // socket to receive stop notifications from main thread and to send result back 00129 // is connected 00130 00131 zmq_pollitem_t item[3]; //stop, in, out 00132 00133 item[0].socket = ss; 00134 item[0].events = ZMQ_POLLIN; 00135 00136 item[1].socket = s_req; 00137 item[1].events = ZMQ_POLLIN; 00138 00139 item[2].socket = s_res; 00140 item[2].events = ZMQ_POLLOUT; 00141 00142 std::vector<ZmqMessage::Multipart*> queue; 00143 00144 TaskVec tasks; 00145 00146 for(;;) 00147 { 00148 //have something to send 00149 item[2].events = (queue.empty()) ? 0 : ZMQ_POLLOUT; 00150 //do not receive more tasks until we send pending responses 00151 item[1].events = (queue.empty()) ? ZMQ_POLLIN : 0; 00152 00153 int res = zmq::poll(item, sizeof(item) / sizeof(item[0]), 1000000); //1s 00154 00155 if (res == 0) 00156 { 00157 //timeout 00158 std::cout << "RUN TASKS: " << tasks.size() << std::endl; 00159 run_tasks(tasks, s_res, std::back_inserter(queue)); 00160 continue; 00161 } 00162 if (item[0].revents) // stop 00163 { 00164 std::cout << " stop" << std::endl; 00165 break; 00166 } 00167 else if ((item[2].revents & ZMQ_POLLOUT) && (item[2].revents & ZMQ_POLLOUT)) 00168 { 00169 std::cout << "POLLOUT, sending" << std::endl; 00170 std::auto_ptr<ZmqMessage::Multipart> m(queue.back()); //no throw 00171 queue.back() = 0; //no throw 00172 queue.pop_back(); 00173 ZmqMessage::send(s_res, *m, true); 00174 } 00175 else if ((item[1].events & ZMQ_POLLIN) && (item[1].revents & ZMQ_POLLIN)) 00176 { 00177 std::cout << "POLLIN, new task" << std::endl; 00178 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s_req); 00179 ingress.receive(2, to_worker_fields, 2, true); 00180 00181 std::string tmp; 00182 async_task task; 00183 ingress >> tmp >> task.id; 00184 00185 task.remaining_steps = (rand() % (max_task_steps - 1)) + 1; //1-max_task_steps 00186 00187 tasks.push_back(task); 00188 } 00189 } 00190 } 00191 catch(const std::exception& ex) 00192 { 00193 std::cout << "caught (processor): " << ex.what(); 00194 exit(3); 00195 } 00196 return 0; 00197 } 00198 00199 00200 int 00201 main(int, char**) 00202 { 00203 pthread_t worker_tid; 00204 pthread_t sender_tid; 00205 00206 zmq::socket_t s_req(ctx, ZMQ_PUSH); 00207 zmq::socket_t s_res(ctx, ZMQ_PULL); 00208 s_req.setsockopt(ZMQ_HWM, &message_queue_limit, sizeof(uint64_t)); 00209 s_res.setsockopt(ZMQ_HWM, &message_queue_limit, sizeof(uint64_t)); 00210 00211 s_req.bind(req_endpoint); 00212 s_res.bind(res_endpoint); 00213 00214 zmq::socket_t ss(ctx, ZMQ_PUB); 00215 ss.bind(stop_endpoint); 00216 // socket to talk to worker is bound 00217 00218 try 00219 { 00220 // start worker 00221 pthread_create(&worker_tid, 0, async_task_processor, 0); 00222 sleep(1); 00223 00224 for (int i = 0; i < message_queue_limit+2; ++i) 00225 { 00226 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker(s_req, ZmqMessage::OutOptions::NONBLOCK); 00227 to_worker << "request" << i << ZmqMessage::Flush; 00228 usleep(100000); 00229 } 00230 00231 std::cout << "1:requests sent: " << (message_queue_limit+2) << 00232 ", sleeping " << max_task_steps << std::endl; 00233 sleep(max_task_steps + 1); 00234 00235 //all tasks are processed, res queue is full, 1 remaining response is cached 00236 //so no new requests is accepted 00237 for (int i = message_queue_limit+2; i < 2*message_queue_limit+2; ++i) 00238 { 00239 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker(s_req, ZmqMessage::OutOptions::NONBLOCK); 00240 to_worker << "request" << i << ZmqMessage::Flush; 00241 usleep(100000); 00242 } 00243 std::cout << "2:requests sent: " << message_queue_limit << std::endl; 00244 00245 //req queue filled 00246 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker( 00247 s_req, ZmqMessage::OutOptions::NONBLOCK | ZmqMessage::OutOptions::DROP_ON_BLOCK); 00248 to_worker << "request" << (2*message_queue_limit+2) << ZmqMessage::Flush; 00249 assert(to_worker.is_dropping()); 00250 00251 //read all 00252 std::cout << "reading..." << std::endl; 00253 00254 std::string msg_type; 00255 std::string msg_id; 00256 00257 for (int i = 0; i < 2*message_queue_limit; ++i) 00258 { 00259 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> from_worker0(s_res); 00260 from_worker0.receive(2, from_worker_fields, 2, true); 00261 00262 from_worker0 >> msg_type >> msg_id; 00263 std::cout << msg_type << msg_id << "received by main thread" << std::endl; 00264 } 00265 00266 //stop 00267 00268 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(ss, 0); 00269 to_stop << "stop" << ZmqMessage::Flush; 00270 } 00271 catch(const std::exception& ex) 00272 { 00273 std::cout << "caught (main): " << ex.what(); 00274 exit(3); 00275 } 00276 00277 pthread_join(worker_tid, 0); 00278 // pthread_join(sender_tid, 0); 00279 // thread is completed and sockets are closed 00280 } 00281 00282 00283 00284 00285 00286