Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

zserialize.cpp

This example launches a separate thread which receives multipart messages contain different data types serialized in different ways

00001 
00009 #include <pthread.h>
00010 #include <vector>
00011 
00012 #include <zmq.hpp>
00013 
00014 #include "ZmqMessage.hpp"
00015 
00023 //0mq context is globally available
00024 zmq::context_t ctx(1);
00025 
00026 // endpoint to pass data from main thread to worker and back
00027 const char* endpoint = "inproc://transport";
00028 
00029 struct SomeData
00030 {
00031   int f1;
00032   char f2[20];
00033 };
00034 
00035 // it is possible to send SomeData objects
00036 //  in text form which require serialization
00037 //  and desirialization methods.
00038 
00039 
00040 template <class charT, class traits>
00041 std::basic_ostream<charT, traits>&
00042 operator<<(std::basic_ostream<charT, traits>& strm,
00043            const SomeData& d)
00044 {
00045   strm << d.f1 << ':' << d.f2;
00046   return strm;
00047 }
00048 
00049 template <class charT, class traits>
00050 std::basic_istream<charT, traits>&
00051 operator>>(std::basic_istream<charT, traits>& strm,
00052            SomeData& d)
00053 {
00054   char ignore;
00055   strm >> d.f1  >> ignore >> d.f2;
00056   return strm;
00057 }
00058 
00059 // to send in binary form
00060 //  because of "raw_mark" tag
00061 struct SomeBinaryData
00062 {
00063   typedef void raw_mark;
00064 
00065   int f1;
00066   char f2[20];
00067 };
00068 
00069 // to send in binary form
00070 ZMQMESSAGE_BINARY_TYPE(double)
00071 ;
00072 
00073 const char* to_worker_fields[] = {"text", "structure", "numeric"};
00074 
00075 // worker runs in a dedicated thread
00076 void*
00077 receiver(void*)
00078 {
00079   try
00080   {
00081     zmq::socket_t s(ctx, ZMQ_PULL);
00082     s.connect(endpoint);
00083     // socket to receive data from main thread
00084     //   is connected
00085 
00086     typedef ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ZIn;
00087 
00088     std::string text;
00089     SomeData data;
00090     int num;
00091 
00092     // receive data
00093     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >> data >> num;
00094     //   and print it out
00095     std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00096 
00097     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00098       ZmqMessage::Binary>> data >> num;
00099     std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00100 
00101     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00102       ZmqMessage::Binary>> data >> ZmqMessage::Text >> num;
00103     std::cout << text << ' ' << data << num << ", numeric " << std::endl;
00104 
00105     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00106       data >> ZmqMessage::Text >> num;
00107     std::cout << text << ' ' << data << ", numeric " << num << std::endl;
00108 
00109     SomeBinaryData binary_data;
00110 
00111     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00112       binary_data >> ZmqMessage::Text >> num;
00113     std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00114       ", numeric " << num << std::endl;
00115 
00116     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00117       ZmqMessage::Binary >> binary_data >> ZmqMessage::Text >> num;
00118     std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00119       ", numeric " << num << std::endl;
00120 
00121     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00122       binary_data >> ZmqMessage::Text >> num;
00123     std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00124       ", numeric " << num << std::endl;
00125 
00126     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00127       binary_data >> ZmqMessage::Binary >> num;
00128     std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00129       ", numeric " << num << std::endl;
00130 
00131     double double_num;
00132 
00133     ZIn(s).receive(3, to_worker_fields, 3, true) >> text >>
00134       binary_data >> double_num;
00135     std::cout << text << ' ' << binary_data.f1 << ':' << binary_data.f2 <<
00136       ", numeric " << double_num << std::endl;
00137 
00138   }
00139   catch(const std::exception& ex)
00140   {
00141     std::cout << "caught (worker): " << ex.what();
00142     exit(3);
00143   }
00144 
00145   return 0;
00146 }
00147 
00148 int
00149 main(int, char**)
00150 {
00151   pthread_t worker_tid;
00152 
00153   zmq::socket_t s(ctx, ZMQ_PUSH);
00154   s.bind(endpoint);
00155   // socket to talk to worker is bound
00156 
00157   try
00158   {
00159     // start worker
00160     pthread_create(&worker_tid, 0, receiver, 0);
00161 
00162     SomeData data;
00163     data.f1 = 123;
00164     strcpy(data.f2, "a string");
00165 
00166     typedef ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> ZOut;
00167 
00168     ZOut(s, 0) << "Sent in default mode" << data << 100 <<
00169       ZmqMessage::Flush;
00170 
00171     ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data << 100 <<
00172       ZmqMessage::Flush;;
00173 
00174     ZOut(s, 0) << "Structure in binary mode, numeric as text" <<
00175       ZmqMessage::Binary << data << ZmqMessage::Text << 100 <<
00176       ZmqMessage::Flush;
00177 
00178     ZOut(s, 0) << "Structure in default (text) mode, numeric as binary" <<
00179       data << ZmqMessage::Binary << 100 << ZmqMessage::Flush;;
00180 
00181     SomeBinaryData binary_data;
00182     binary_data.f1 = 123;
00183     strcpy(binary_data.f2, "another string");
00184 
00185     ZOut(s, 0) << "Sent in default mode" << binary_data << 100 <<
00186       ZmqMessage::Flush;
00187 
00188     ZOut(s, 0) << "Sent in binary mode" << ZmqMessage::Binary << data <<
00189       100 << ZmqMessage::Flush;
00190 
00191     ZOut(s, 0) << "Structure in binary mode, numeric as text" <<
00192       ZmqMessage::Binary << data << ZmqMessage::Text << 100 <<
00193       ZmqMessage::Flush;
00194 
00195     ZOut(s, 0) << "Structure in default (binary) mode, numeric as binary" <<
00196       data << ZmqMessage::Binary << 100 << ZmqMessage::Flush;
00197 
00198     ZOut(s, 0) << "Structure in default (binary) mode, "
00199       "numeric as binary too" <<
00200       data << (double)100.1 << ZmqMessage::Flush;
00201   }
00202   catch(const std::exception& ex)
00203   {
00204     std::cout << "caught (main): " << ex.what();
00205     exit(3);
00206   }
00207 
00208   pthread_join(worker_tid, 0);
00209   // thread is completed and sockets are closed
00210 }
00211 

ZmqMessage 0.1 — open source software, support@zmqmessage.org