00001
00009 #include "pthread.h"
00010
00011 #include "ZmqMessage.hpp"
00012
00019
00020 zmq::context_t ctx(1);
00021
00022
00023 const char* endpoint = "inproc://transport";
00024
00025
00026 const char* stop_endpoint = "inproc://stop";
00027
00028 const char* to_worker_fields[] = {"message_type", "request_identifier"};
00029 const char* from_worker_fields[] = {"message_type", "response_identifier"};
00030
00031 uint64_t one_message_queue = 1;
00032
00033
00034 void*
00035 worker(void*)
00036 {
00037 try
00038 {
00039 zmq::socket_t s(ctx, ZMQ_REP);
00040 s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t));
00041
00042 s.connect(endpoint);
00043
00044
00045
00046 zmq::socket_t ss(ctx, ZMQ_SUB);
00047 ss.setsockopt (ZMQ_SUBSCRIBE, "", 0);
00048 ss.connect(stop_endpoint);
00049
00050
00051
00052 zmq_pollitem_t item[2];
00053
00054 item[0].socket = ss;
00055 item[0].events = ZMQ_POLLIN;
00056
00057 item[1].socket = s;
00058 item[1].events = ZMQ_POLLIN | ZMQ_POLLOUT;
00059
00060 std::auto_ptr<ZmqMessage::Multipart> queue;
00061
00062 for(;;)
00063 {
00064 zmq::poll(item, sizeof(item) / sizeof(item[0]), 0);
00065
00066 if (item[0].revents)
00067 {
00068 std::cout << "stop" << std::endl;
00069 break;
00070 }
00071 else if (item[1].revents & ZMQ_POLLOUT)
00072 {
00073 std::cout << "POLLOUT" << std::endl;
00074 send(s, *queue.get(), true);
00075 queue.reset(0);
00076 }
00077 else if (item[1].revents & ZMQ_POLLIN)
00078 {
00079 std::cout << "POLLIN" << std::endl;
00080 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00081 ingress.receive(2, to_worker_fields, 2, true);
00082
00083 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(
00084 ZmqMessage::OutOptions(
00085 s,
00086 ZmqMessage::OutOptions::CACHE_ON_BLOCK |
00087 ZmqMessage::OutOptions::NONBLOCK),
00088 ingress);
00089
00090 egress << "response" << ingress[1] << ZmqMessage::Flush;
00091
00092 if (egress.is_queued())
00093 {
00094 assert(!queue.get());
00095 std::cout << "is_queued" << std::endl;
00096 queue.reset(egress.detach());
00097 }
00098 }
00099 }
00100 }
00101 catch(const std::exception& ex)
00102 {
00103 std::cout << "caught (worker): " << ex.what();
00104 exit(3);
00105 }
00106 return 0;
00107 }
00108
00109 int
00110 main(int, char**)
00111 {
00112 pthread_t worker_tid;
00113
00114 zmq::socket_t s(ctx, ZMQ_XREQ);
00115 s.setsockopt(ZMQ_HWM, &one_message_queue, sizeof(uint64_t));
00116 s.bind(endpoint);
00117
00118
00119 zmq::socket_t ss(ctx, ZMQ_PUB);
00120 ss.bind(stop_endpoint);
00121
00122
00123 try
00124 {
00125
00126 pthread_create(&worker_tid, 0, worker, 0);
00127
00128 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker0(s, 0);
00129 to_worker0 << "request" << "#0" << ZmqMessage::Flush;
00130
00131 sleep(1);
00132
00133 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker1(s, 0);
00134 to_worker1 << "request" << "#1" << ZmqMessage::Flush;
00135
00136 ZmqMessage::Outgoing<ZmqMessage::XRouting> to_worker2(s, 0);
00137
00138
00139 sleep(1);
00140
00141 std::string msg_type;
00142 std::string msg_id;
00143
00144 ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker0(s);
00145 from_worker0.receive(2, from_worker_fields, 2, true);
00146
00147 from_worker0 >> msg_type >> msg_id;
00148 std::cout << msg_type << msg_id << "received by main thread" << std::endl;
00149
00150 ZmqMessage::Incoming<ZmqMessage::XRouting> from_worker1(s);
00151 from_worker1.receive(2, from_worker_fields, 2, true);
00152
00153 from_worker1 >> msg_type >> msg_id;
00154 std::cout << msg_type << msg_id << "received by main thread" << std::endl;
00155
00156 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(ss, 0);
00157 to_stop << "stop" << ZmqMessage::Flush;
00158 }
00159 catch(const std::exception& ex)
00160 {
00161 std::cout << "caught (main): " << ex.what();
00162 exit(3);
00163 }
00164
00165 pthread_join(worker_tid, 0);
00166
00167 }
00168
00169
00170
00171
00172
00173