Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

zqueue.cpp

Currently not working, needs ZMQ fix

00001 
00009 #include "pthread.h"
00010 
00011 #include "ZmqMessage.hpp"
00012 
00019 //0mq context is globally available
00020 zmq::context_t ctx(1);
00021 
00022 // endpoint to pass data from main thread to worker and back
00023 const char* endpoint = "inproc://transport";
00024 
00025 // endpoint to publish stop signal
00026 const char* stop_endpoint = "inproc://stop";
00027 
00028 const char* to_worker_fields[] = {"message_type", "request_identifier"};
00029 const char* from_worker_fields[] = {"message_type", "response_identifier"};
00030 
00031 uint64_t one_message_queue = 1;
00032 
00033 // worker runs in a dedicated thread
00034 void*
00035 worker(void*)
00036 {
00037   try
00038   {
00039     zmq::socket_t s(ctx, ZMQ_REP);
00040     s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t));
00041 
00042     s.connect(endpoint);
00043     // socket to receive data from main thread and to send result back
00044     //   is connected
00045 
00046     zmq::socket_t ss(ctx, ZMQ_SUB);
00047     ss.setsockopt (ZMQ_SUBSCRIBE, "", 0);
00048     ss.connect(stop_endpoint);
00049     // socket to receive stop notifications from main thread and to send result back
00050     //   is connected
00051 
00052     zmq_pollitem_t item[2];
00053 
00054     item[0].socket = ss;
00055     item[0].events = ZMQ_POLLIN;
00056 
00057     item[1].socket = s;
00058     item[1].events = ZMQ_POLLIN | ZMQ_POLLOUT;
00059 
00060     std::auto_ptr<ZmqMessage::Multipart> queue;
00061 
00062     for(;;)
00063     {
00064       zmq::poll(item, sizeof(item) / sizeof(item[0]), 0);
00065 
00066       if (item[0].revents) // stop
00067       {
00068         std::cout << "stop" << std::endl;
00069         break;
00070       }
00071       else if (item[1].revents & ZMQ_POLLOUT)
00072       {
00073         std::cout << "POLLOUT" << std::endl;
00074         send(s, *queue.get(), true);
00075         queue.reset(0);
00076       }
00077       else if (item[1].revents & ZMQ_POLLIN)
00078       {
00079         std::cout << "POLLIN" << std::endl;
00080         ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00081         ingress.receive(2, to_worker_fields, 2, true);
00082 
00083         ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(
00084           ZmqMessage::OutOptions(
00085             s,
00086             ZmqMessage::OutOptions::CACHE_ON_BLOCK |
00087              ZmqMessage::OutOptions::NONBLOCK),
00088             ingress);
00089 
00090         egress << "response" << ingress[1] << ZmqMessage::Flush;
00091 
00092         if (egress.is_queued())
00093         {
00094           assert(!queue.get());
00095           std::cout << "is_queued" << std::endl;
00096           queue.reset(egress.detach());
00097         }
00098       }
00099     }
00100   }
00101   catch(const std::exception& ex)
00102   {
00103     std::cout << "caught (worker): " << ex.what();
00104     exit(3);
00105   }
00106   return 0;
00107 }
00108 
00109 int
00110 main(int, char**)
00111 {
00112   pthread_t worker_tid;
00113 
00114   zmq::socket_t s(ctx, ZMQ_XREQ);
00115   s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t));
00116   s.bind(endpoint);
00117   // socket to talk to worker is bound
00118 
00119   zmq::socket_t ss(ctx, ZMQ_PUB);
00120   ss.bind(stop_endpoint);
00121   // socket to talk to worker is bound
00122 
00123   try
00124   {
00125     // start worker
00126     pthread_create(&worker_tid, 0, worker, 0);
00127 
00128     ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker0(s, 0);
00129     to_worker0 << "request" << "#0" << ZmqMessage::Flush;
00130 
00131     sleep(1);
00132 
00133     ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker1(s, 0);
00134     to_worker1 << "request" << "#1" << ZmqMessage::Flush;
00135 
00136     ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker2(s, 0);
00137     // to_worker2 << "request" << "#2" << ZmqMessage::Flush;
00138 
00139     sleep(1);
00140 
00141     std::string msg_type;
00142     std::string msg_id;
00143 
00144     ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker0(s);
00145     from_worker0.receive(2, from_worker_fields, 2, true);
00146 
00147     from_worker0 >> msg_type >> msg_id;
00148     std::cout << msg_type << msg_id << "received by main thread" << std::endl;
00149 
00150     ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker1(s);
00151     from_worker1.receive(2, from_worker_fields, 2, true);
00152 
00153     from_worker1 >> msg_type >> msg_id;
00154     std::cout << msg_type << msg_id << "received by main thread" << std::endl;
00155 
00156     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(ss, 0);
00157     to_stop << "stop" << ZmqMessage::Flush;
00158   }
00159     catch(const std::exception& ex)
00160   {
00161     std::cout << "caught (main): " << ex.what();
00162     exit(3);
00163   }
00164 
00165   pthread_join(worker_tid, 0);
00166   // thread is completed and sockets are closed
00167 }
00168 
00169 
00170 
00171 
00172 
00173 

ZmqMessage 0.1 — open source software, support@zmqmessage.org