Overview Tutorial API reference Examples Build Download | ZmqMessage 0.1 - 21 Oct 2011 |
00001 00009 #include "pthread.h" 00010 00011 #include "ZmqMessage.hpp" 00012 00019 //0mq context is globally available 00020 zmq::context_t ctx(1); 00021 00022 // endpoint to pass data from main thread to worker and back 00023 const char* endpoint = "inproc://transport"; 00024 00025 // endpoint to publish stop signal 00026 const char* stop_endpoint = "inproc://stop"; 00027 00028 const char* to_worker_fields[] = {"message_type", "request_identifier"}; 00029 const char* from_worker_fields[] = {"message_type", "response_identifier"}; 00030 00031 uint64_t one_message_queue = 1; 00032 00033 // worker runs in a dedicated thread 00034 void* 00035 worker(void*) 00036 { 00037 try 00038 { 00039 zmq::socket_t s(ctx, ZMQ_REP); 00040 s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t)); 00041 00042 s.connect(endpoint); 00043 // socket to receive data from main thread and to send result back 00044 // is connected 00045 00046 zmq::socket_t ss(ctx, ZMQ_SUB); 00047 ss.setsockopt (ZMQ_SUBSCRIBE, "", 0); 00048 ss.connect(stop_endpoint); 00049 // socket to receive stop notifications from main thread and to send result back 00050 // is connected 00051 00052 zmq_pollitem_t item[2]; 00053 00054 item[0].socket = ss; 00055 item[0].events = ZMQ_POLLIN; 00056 00057 item[1].socket = s; 00058 item[1].events = ZMQ_POLLIN | ZMQ_POLLOUT; 00059 00060 std::auto_ptr<ZmqMessage::Multipart> queue; 00061 00062 for(;;) 00063 { 00064 zmq::poll(item, sizeof(item) / sizeof(item[0]), 0); 00065 00066 if (item[0].revents) // stop 00067 { 00068 std::cout << "stop" << std::endl; 00069 break; 00070 } 00071 else if (item[1].revents & ZMQ_POLLOUT) 00072 { 00073 std::cout << "POLLOUT" << std::endl; 00074 send(s, *queue.get(), true); 00075 queue.reset(0); 00076 } 00077 else if (item[1].revents & ZMQ_POLLIN) 00078 { 00079 std::cout << "POLLIN" << std::endl; 00080 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s); 00081 ingress.receive(2, to_worker_fields, 2, true); 00082 00083 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress( 00084 ZmqMessage::OutOptions( 00085 s, 00086 ZmqMessage::OutOptions::CACHE_ON_BLOCK | 00087 ZmqMessage::OutOptions::NONBLOCK), 00088 ingress); 00089 00090 egress << "response" << ingress[1] << ZmqMessage::Flush; 00091 00092 if (egress.is_queued()) 00093 { 00094 assert(!queue.get()); 00095 std::cout << "is_queued" << std::endl; 00096 queue.reset(egress.detach()); 00097 } 00098 } 00099 } 00100 } 00101 catch(const std::exception& ex) 00102 { 00103 std::cout << "caught (worker): " << ex.what(); 00104 exit(3); 00105 } 00106 return 0; 00107 } 00108 00109 int 00110 main(int, char**) 00111 { 00112 pthread_t worker_tid; 00113 00114 zmq::socket_t s(ctx, ZMQ_XREQ); 00115 s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t)); 00116 s.bind(endpoint); 00117 // socket to talk to worker is bound 00118 00119 zmq::socket_t ss(ctx, ZMQ_PUB); 00120 ss.bind(stop_endpoint); 00121 // socket to talk to worker is bound 00122 00123 try 00124 { 00125 // start worker 00126 pthread_create(&worker_tid, 0, worker, 0); 00127 00128 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker0(s, 0); 00129 to_worker0 << "request" << "#0" << ZmqMessage::Flush; 00130 00131 sleep(1); 00132 00133 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker1(s, 0); 00134 to_worker1 << "request" << "#1" << ZmqMessage::Flush; 00135 00136 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker2(s, 0); 00137 // to_worker2 << "request" << "#2" << ZmqMessage::Flush; 00138 00139 sleep(1); 00140 00141 std::string msg_type; 00142 std::string msg_id; 00143 00144 ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker0(s); 00145 from_worker0.receive(2, from_worker_fields, 2, true); 00146 00147 from_worker0 >> msg_type >> msg_id; 00148 std::cout << msg_type << msg_id << "received by main thread" << std::endl; 00149 00150 ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker1(s); 00151 from_worker1.receive(2, from_worker_fields, 2, true); 00152 00153 from_worker1 >> msg_type >> msg_id; 00154 std::cout << msg_type << msg_id << "received by main thread" << std::endl; 00155 00156 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(ss, 0); 00157 to_stop << "stop" << ZmqMessage::Flush; 00158 } 00159 catch(const std::exception& ex) 00160 { 00161 std::cout << "caught (main): " << ex.what(); 00162 exit(3); 00163 } 00164 00165 pthread_join(worker_tid, 0); 00166 // thread is completed and sockets are closed 00167 } 00168 00169 00170 00171 00172 00173