Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

zasync.cpp

This example emulates processing of asynchronous tasks in a separate thread. Tasks and their results are received and sent via ZMQ. HWM (high watermark) along with queueing and delayed sending is used to avoid congestion of execution thread, blocking and losing any of processing results.

Characteristics of task processing:

In real life tasks would be execution of a series of remote requests, launching processes, IO operations or whatever.

Execution thread behavior is following:

00001 
00011 #include "pthread.h"
00012 #include "stdint.h"
00013 
00014 #include <vector>
00015 #include "ZmqMessage.hpp"
00016 
00047 //0mq context is globally available
00048 zmq::context_t ctx(1);
00049 
00050 // endpoints to pass data from main thread to worker and back
00051 const char* req_endpoint = "inproc://req_ep";
00052 const char* res_endpoint = "inproc://res_ep";
00053 
00054 // endpoint to publish stop signal, we shouldn't miss it!
00055 const char* stop_endpoint = "inproc://stop";
00056 
00057 const char* to_worker_fields[] = {"message_type", "task_identifier"};
00058 const char* from_worker_fields[] = {"message_type", "task_identifier"};
00059 
00060 uint64_t message_queue_limit = 5;
00061 int max_task_steps = 10;
00062 
00063 //we emulate asynchronous tasks by decrementing counter on each poll timeout event
00064 struct async_task
00065 {
00066   int id;
00067   int remaining_steps;
00068 };
00069 
00070 typedef std::vector<async_task> TaskVec;
00071 
00072 void task_step(async_task& task)
00073 {
00074   --task.remaining_steps;
00075 }
00076 
00077 template <typename QueueInserter>
00078 void run_tasks(TaskVec& tasks, zmq::socket_t& s_res, QueueInserter q)
00079 {
00080   std::for_each(tasks.begin(), tasks.end(), &task_step);
00081   for (TaskVec::iterator it = tasks.begin(); it != tasks.end(); )
00082   {
00083     async_task& task = *it;
00084     if (task.remaining_steps == 0)
00085     {
00086       std::cout << " task " << task.id << " done" << std::endl;
00087       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(
00088         ZmqMessage::OutOptions(
00089           s_res, ZmqMessage::OutOptions::CACHE_ON_BLOCK |
00090           ZmqMessage::OutOptions::NONBLOCK));
00091 
00092       egress << "finished" << task.id << ZmqMessage::Flush;
00093 
00094       if (egress.is_queued())
00095       {
00096         std::cout << " is_queued" << std::endl;
00097         *q = egress.detach();
00098       }
00099       it = tasks.erase(it);
00100     }
00101     else
00102     {
00103       ++it;
00104     }
00105   }
00106 }
00107 
00108 // async task processor, runs in a dedicated thread
00109 //Receives 'tasks' designated by id. Each task will take random (1-max_task_steps) steps to complete.
00110 //so 'finished' responses will come in arbitrary order
00111 void*
00112 async_task_processor(void*)
00113 {
00114   try
00115   {
00116     zmq::socket_t s_req(ctx, ZMQ_PULL);
00117     zmq::socket_t s_res(ctx, ZMQ_PUSH);
00118     uint64_t one_lim = 1;
00119     s_req.setsockopt(ZMQ_HWM, &one_lim, sizeof(uint64_t));
00120     s_res.setsockopt(ZMQ_HWM, &one_lim, sizeof(uint64_t));
00121 
00122     s_req.connect(req_endpoint);
00123     s_res.connect(res_endpoint);
00124 
00125     zmq::socket_t ss(ctx, ZMQ_SUB);
00126     ss.setsockopt (ZMQ_SUBSCRIBE, "", 0);
00127     ss.connect(stop_endpoint);
00128     // socket to receive stop notifications from main thread and to send result back
00129     //   is connected
00130 
00131     zmq_pollitem_t item[3]; //stop, in, out
00132 
00133     item[0].socket = ss;
00134     item[0].events = ZMQ_POLLIN;
00135 
00136     item[1].socket = s_req;
00137     item[1].events = ZMQ_POLLIN;
00138 
00139     item[2].socket = s_res;
00140     item[2].events = ZMQ_POLLOUT;
00141 
00142     std::vector<ZmqMessage::Multipart*> queue;
00143 
00144     TaskVec tasks;
00145 
00146     for(;;)
00147     {
00148       //have something to send
00149       item[2].events = (queue.empty()) ? 0 : ZMQ_POLLOUT;
00150       //do not receive more tasks until we send pending responses
00151       item[1].events = (queue.empty()) ? ZMQ_POLLIN : 0;
00152 
00153       int res = zmq::poll(item, sizeof(item) / sizeof(item[0]), 1000000); //1s
00154 
00155       if (res == 0)
00156       {
00157         //timeout
00158         std::cout << "RUN TASKS: " << tasks.size() << std::endl;
00159         run_tasks(tasks, s_res, std::back_inserter(queue));
00160         continue;
00161       }
00162       if (item[0].revents) // stop
00163       {
00164         std::cout << " stop" << std::endl;
00165         break;
00166       }
00167       else if ((item[2].revents & ZMQ_POLLOUT) && (item[2].revents & ZMQ_POLLOUT))
00168       {
00169         std::cout << "POLLOUT, sending" << std::endl;
00170         std::auto_ptr<ZmqMessage::Multipart> m(queue.back()); //no throw
00171         queue.back() = 0; //no throw
00172         queue.pop_back();
00173         ZmqMessage::send(s_res, *m, true);
00174       }
00175       else if ((item[1].events & ZMQ_POLLIN) && (item[1].revents & ZMQ_POLLIN))
00176       {
00177         std::cout << "POLLIN, new task" << std::endl;
00178         ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s_req);
00179         ingress.receive(2, to_worker_fields, 2, true);
00180 
00181         std::string tmp;
00182         async_task task;
00183         ingress >> tmp >> task.id;
00184 
00185         task.remaining_steps = (rand() % (max_task_steps - 1)) + 1; //1-max_task_steps
00186 
00187         tasks.push_back(task);
00188       }
00189     }
00190   }
00191   catch(const std::exception& ex)
00192   {
00193     std::cout << "caught (processor): " << ex.what();
00194     exit(3);
00195   }
00196   return 0;
00197 }
00198 
00199 
00200 int
00201 main(int, char**)
00202 {
00203   pthread_t worker_tid;
00204   pthread_t sender_tid;
00205 
00206   zmq::socket_t s_req(ctx, ZMQ_PUSH);
00207   zmq::socket_t s_res(ctx, ZMQ_PULL);
00208   s_req.setsockopt(ZMQ_HWM, &message_queue_limit, sizeof(uint64_t));
00209   s_res.setsockopt(ZMQ_HWM, &message_queue_limit, sizeof(uint64_t));
00210 
00211   s_req.bind(req_endpoint);
00212   s_res.bind(res_endpoint);
00213 
00214   zmq::socket_t ss(ctx, ZMQ_PUB);
00215   ss.bind(stop_endpoint);
00216   // socket to talk to worker is bound
00217 
00218   try
00219   {
00220     // start worker
00221     pthread_create(&worker_tid, 0, async_task_processor, 0);
00222     sleep(1);
00223 
00224     for (int i = 0; i < message_queue_limit+2; ++i)
00225     {
00226       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker(s_req, ZmqMessage::OutOptions::NONBLOCK);
00227       to_worker << "request" << i << ZmqMessage::Flush;
00228       usleep(100000);
00229     }
00230 
00231     std::cout << "1:requests sent: " << (message_queue_limit+2) <<
00232       ", sleeping " << max_task_steps << std::endl;
00233     sleep(max_task_steps + 1);
00234 
00235     //all tasks are processed, res queue is full, 1 remaining response is cached
00236     //so no new requests is accepted
00237     for (int i = message_queue_limit+2; i < 2*message_queue_limit+2; ++i)
00238     {
00239       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker(s_req, ZmqMessage::OutOptions::NONBLOCK);
00240       to_worker << "request" << i << ZmqMessage::Flush;
00241       usleep(100000);
00242     }
00243     std::cout << "2:requests sent: " << message_queue_limit << std::endl;
00244 
00245     //req queue filled
00246     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_worker(
00247       s_req, ZmqMessage::OutOptions::NONBLOCK | ZmqMessage::OutOptions::DROP_ON_BLOCK);
00248     to_worker << "request" << (2*message_queue_limit+2) << ZmqMessage::Flush;
00249     assert(to_worker.is_dropping());
00250 
00251     //read all
00252     std::cout << "reading..." << std::endl;
00253 
00254     std::string msg_type;
00255     std::string msg_id;
00256 
00257     for (int i = 0; i < 2*message_queue_limit; ++i)
00258     {
00259       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> from_worker0(s_res);
00260       from_worker0.receive(2, from_worker_fields, 2, true);
00261 
00262       from_worker0 >> msg_type >> msg_id;
00263       std::cout << msg_type << msg_id << "received by main thread" << std::endl;
00264     }
00265 
00266     //stop
00267 
00268     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(ss, 0);
00269     to_stop << "stop" << ZmqMessage::Flush;
00270   }
00271     catch(const std::exception& ex)
00272   {
00273     std::cout << "caught (main): " << ex.what();
00274     exit(3);
00275   }
00276 
00277   pthread_join(worker_tid, 0);
00278 //  pthread_join(sender_tid, 0);
00279   // thread is completed and sockets are closed
00280 }
00281 
00282 
00283 
00284 
00285 
00286 

ZmqMessage 0.1 — open source software, support@zmqmessage.org