00001
00009 #include <pthread.h>
00010 #include <vector>
00011
00012 #include <zmq.hpp>
00013
00014 #include "ZmqMessage.hpp"
00015
00023
00024 zmq::context_t ctx(1);
00025
00026
00027 const char* endpoint = "inproc://transport";
00028
00029 struct SomeData
00030 {
00031 int f1;
00032 char f2[20];
00033 };
00034
00035
00036
00037
00038
00039
00040 template <class charT, class traits>
00041 std::basic_ostream<charT, traits>&
00042 operator<<(std::basic_ostream<charT, traits>& strm,
00043 const SomeData& d)
00044 {
00045 strm << d.f1 << ':' << d.f2;
00046 return strm;
00047 }
00048
00049 template <class charT, class traits>
00050 std::basic_istream<charT, traits>&
00051 operator>>(std::basic_istream<charT, traits>& strm,
00052 SomeData& d)
00053 {
00054 char ignore;
00055 strm >> d.f1 >> ignore >> d.f2;
00056 return strm;
00057 }
00058
00059
00060
00061 struct SomeBinaryData
00062 {
00063 typedef void raw_mark;
00064
00065 int f1;
00066 char f2[20];
00067 };
00068
00069
00070 ZMQMESSAGE_BINARY_TYPE(double)
00071 ;
00072
00073 const char* to_worker_fields[] = {"text", "structure", "numeric"};
00074
00075
00076 void*
00077 receiver(void*)
00078 {
00079 try
00080 {
00081 zmq::socket_t s(ctx, ZMQ_PULL);
00082 s.connect(endpoint);
00083
00084
00085
00086 typedef ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ZIn;
00087
00088 std::string text;
00089 SomeData data;
00090 int num;
00091
00092
00093 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> data >> num;
00094
00095 std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00096
00097 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00098 ZmqMessage::Binary>> data >> num;
00099 std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00100
00101 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00102 ZmqMessage::Binary>> data >> ZmqMessage::Text >> num;
00103 std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00104
00105 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00106 data >> ZmqMessage::Text >> num;
00107 std::cout << text << ' ' << data << ", numeric " << num << std::endl;
00108
00109 SomeBinaryData binary_data;
00110
00111 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00112 binary_data >> ZmqMessage::Text >> num;
00113 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00114 ", numeric " << num << std::endl;
00115
00116 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00117 ZmqMessage::Binary >> binary_data >> ZmqMessage::Text >> num;
00118 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00119 ", numeric " << num << std::endl;
00120
00121 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00122 binary_data >> ZmqMessage::Text >> num;
00123 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00124 ", numeric " << num << std::endl;
00125
00126 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00127 binary_data >> ZmqMessage::Binary >> num;
00128 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00129 ", numeric " << num << std::endl;
00130
00131 double double_num;
00132
00133 ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00134 binary_data >> double_num;
00135 std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00136 ", numeric " << double_num << std::endl;
00137
00138 }
00139 catch(const std::exception& ex)
00140 {
00141 std::cout << "caught (worker): " << ex.what();
00142 exit(3);
00143 }
00144
00145 return 0;
00146 }
00147
00148 int
00149 main(int, char**)
00150 {
00151 pthread_t worker_tid;
00152
00153 zmq::socket_t s(ctx, ZMQ_PUSH);
00154 s.bind(endpoint);
00155
00156
00157 try
00158 {
00159
00160 pthread_create(&worker_tid, 0, receiver, 0);
00161
00162 SomeData data;
00163 data.f1 = 123;
00164 strcpy(data.f2, "a string");
00165
00166 typedef ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> ZOut;
00167
00168 ZOut(s, 0) << "Sent in default mode" << data << 100 <<
00169 ZmqMessage::Flush;
00170
00171 ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data << 100 <<
00172 ZmqMessage::Flush;;
00173
00174 ZOut(s, 0) << "Structure in binary mode, numeric as text" <<
00175 ZmqMessage::Binary << data << ZmqMessage::Text << 100 <<
00176 ZmqMessage::Flush;
00177
00178 ZOut(s, 0) << "Structure in default (text) mode, numeric as binary" <<
00179 data << ZmqMessage::Binary << 100 << ZmqMessage::Flush;;
00180
00181 SomeBinaryData binary_data;
00182 binary_data.f1 = 123;
00183 strcpy(binary_data.f2, "another string");
00184
00185 ZOut(s, 0) << "Sent in default mode" << binary_data << 100 <<
00186 ZmqMessage::Flush;
00187
00188 ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data <<
00189 100 << ZmqMessage::Flush;
00190
00191 ZOut(s, 0) << "Structure in binary mode, numeric as text" <<
00192 ZmqMessage::Binary << data << ZmqMessage::Text << 100 <<
00193 ZmqMessage::Flush;
00194
00195 ZOut(s, 0) << "Structure in default (binary) mode, numeric as binary" <<
00196 data << ZmqMessage::Binary << 100 << ZmqMessage::Flush;
00197
00198 ZOut(s, 0) << "Structure in default (binary) mode, "
00199 "numeric as binary too" <<
00200 data << (double)100.1 << ZmqMessage::Flush;
00201 }
00202 catch(const std::exception& ex)
00203 {
00204 std::cout << "caught (main): " << ex.what();
00205 exit(3);
00206 }
00207
00208 pthread_join(worker_tid, 0);
00209
00210 }
00211