Overview Tutorial API reference Examples Build Download | ZmqMessage 0.1 - 21 Oct 2011 |
00001 00009 #include <pthread.h> 00010 #include <vector> 00011 00012 #include <zmq.hpp> 00013 00014 #include "ZmqMessage.hpp" 00015 00023 //0mq context is globally available 00024 zmq::context_t ctx(1); 00025 00026 // endpoint to pass data from main thread to worker and back 00027 const char* endpoint = "inproc://transport"; 00028 00029 struct SomeData 00030 { 00031 int f1; 00032 char f2[20]; 00033 }; 00034 00035 // it is possible to send SomeData objects 00036 // in text form which require serialization 00037 // and desirialization methods. 00038 00039 00040 template <class charT, class traits> 00041 std::basic_ostream<charT, traits>& 00042 operator<<(std::basic_ostream<charT, traits>& strm, 00043 const SomeData& d) 00044 { 00045 strm << d.f1 << ':' << d.f2; 00046 return strm; 00047 } 00048 00049 template <class charT, class traits> 00050 std::basic_istream<charT, traits>& 00051 operator>>(std::basic_istream<charT, traits>& strm, 00052 SomeData& d) 00053 { 00054 char ignore; 00055 strm >> d.f1 >> ignore >> d.f2; 00056 return strm; 00057 } 00058 00059 // to send in binary form 00060 // because of "raw_mark" tag 00061 struct SomeBinaryData 00062 { 00063 typedef void raw_mark; 00064 00065 int f1; 00066 char f2[20]; 00067 }; 00068 00069 // to send in binary form 00070 ZMQMESSAGE_BINARY_TYPE(double) 00071 ; 00072 00073 const char* to_worker_fields[] = {"text", "structure", "numeric"}; 00074 00075 // worker runs in a dedicated thread 00076 void* 00077 receiver(void*) 00078 { 00079 try 00080 { 00081 zmq::socket_t s(ctx, ZMQ_PULL); 00082 s.connect(endpoint); 00083 // socket to receive data from main thread 00084 // is connected 00085 00086 typedef ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ZIn; 00087 00088 std::string text; 00089 SomeData data; 00090 int num; 00091 00092 // receive data 00093 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> data >> num; 00094 // and print it out 00095 std::cout << text << ' ' << data << num << ", numeric " << std::endl; 00096 00097 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00098 ZmqMessage::Binary>> data >> num; 00099 std::cout << text << ' ' << data << num << ", numeric " << std::endl; 00100 00101 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00102 ZmqMessage::Binary>> data >> ZmqMessage::Text >> num; 00103 std::cout << text << ' ' << data << num << ", numeric " << std::endl; 00104 00105 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00106 data >> ZmqMessage::Text >> num; 00107 std::cout << text << ' ' << data << ", numeric " << num << std::endl; 00108 00109 SomeBinaryData binary_data; 00110 00111 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00112 binary_data >> ZmqMessage::Text >> num; 00113 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 << 00114 ", numeric " << num << std::endl; 00115 00116 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00117 ZmqMessage::Binary >> binary_data >> ZmqMessage::Text >> num; 00118 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 << 00119 ", numeric " << num << std::endl; 00120 00121 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00122 binary_data >> ZmqMessage::Text >> num; 00123 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 << 00124 ", numeric " << num << std::endl; 00125 00126 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00127 binary_data >> ZmqMessage::Binary >> num; 00128 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 << 00129 ", numeric " << num << std::endl; 00130 00131 double double_num; 00132 00133 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> 00134 binary_data >> double_num; 00135 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 << 00136 ", numeric " << double_num << std::endl; 00137 00138 } 00139 catch(const std::exception& ex) 00140 { 00141 std::cout << "caught (worker): " << ex.what(); 00142 exit(3); 00143 } 00144 00145 return 0; 00146 } 00147 00148 int 00149 main(int, char**) 00150 { 00151 pthread_t worker_tid; 00152 00153 zmq::socket_t s(ctx, ZMQ_PUSH); 00154 s.bind(endpoint); 00155 // socket to talk to worker is bound 00156 00157 try 00158 { 00159 // start worker 00160 pthread_create(&worker_tid, 0, receiver, 0); 00161 00162 SomeData data; 00163 data.f1 = 123; 00164 strcpy(data.f2, "a string"); 00165 00166 typedef ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> ZOut; 00167 00168 ZOut(s, 0) << "Sent in default mode" << data << 100 << 00169 ZmqMessage::Flush; 00170 00171 ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data << 100 << 00172 ZmqMessage::Flush;; 00173 00174 ZOut(s, 0) << "Structure in binary mode, numeric as text" << 00175 ZmqMessage::Binary << data << ZmqMessage::Text << 100 << 00176 ZmqMessage::Flush; 00177 00178 ZOut(s, 0) << "Structure in default (text) mode, numeric as binary" << 00179 data << ZmqMessage::Binary << 100 << ZmqMessage::Flush;; 00180 00181 SomeBinaryData binary_data; 00182 binary_data.f1 = 123; 00183 strcpy(binary_data.f2, "another string"); 00184 00185 ZOut(s, 0) << "Sent in default mode" << binary_data << 100 << 00186 ZmqMessage::Flush; 00187 00188 ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data << 00189 100 << ZmqMessage::Flush; 00190 00191 ZOut(s, 0) << "Structure in binary mode, numeric as text" << 00192 ZmqMessage::Binary << data << ZmqMessage::Text << 100 << 00193 ZmqMessage::Flush; 00194 00195 ZOut(s, 0) << "Structure in default (binary) mode, numeric as binary" << 00196 data << ZmqMessage::Binary << 100 << ZmqMessage::Flush; 00197 00198 ZOut(s, 0) << "Structure in default (binary) mode, " 00199 "numeric as binary too" << 00200 data << (double)100.1 << ZmqMessage::Flush; 00201 } 00202 catch(const std::exception& ex) 00203 { 00204 std::cout << "caught (main): " << ex.what(); 00205 exit(3); 00206 } 00207 00208 pthread_join(worker_tid, 0); 00209 // thread is completed and sockets are closed 00210 } 00211