Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

zbisort.cpp

This is a little more complex example. Demonstrates Text and Binary modes, iterators, message relaying.

Main thread starts a relay thread (relay(void*)). It in turn starts two sorting workers. One worker is sorting incoming messages as integers (sorter<int>(void*)) in numeric order, and the second one (sorter<StringFace>(void*)) is sorting as strings (in alphanumeric order). Relay thread is a dispatcher: it reads first message ("mode" message, it can be either "INT", "STRING" or "STOP") and relays all tail message parts to appropriate worker. Response from worker is relayed back to main thread.

Also we implemented simple class StringFace that conforms to string concept (see ZMQMESSAGE_STRING_CLASS for description) and just wraps external memory to avoid intermediate copying while sorting.

00001 
00010 #include "pthread.h"
00011 #include <vector>
00012 
00013 #include <zmq.hpp>
00014 
00015 #include "ZmqMessage.hpp"
00016 
00017 #include "StringFace.hpp"
00018 
00039 // 0mq context is globally available
00040 zmq::context_t ctx(1);
00041 
00042 // endpoint to pass data from main thread to relay and back
00043 const char* relay_endpoint = "inproc://relay";
00044 // endpoints to pass data from realy thread to string and numeric workers and back
00045 const char* string_endpoint = "inproc://string";
00046 const char* int_endpoint = "inproc://int";
00047 
00048 const std::string int_mode = "INT";
00049 const std::string string_mode = "STRING";
00050 const std::string stop_mode = "STOP";
00051 
00052 // worker run in a dedicated thread. Launched by relay thread.
00053 template <typename T>
00054 void*
00055 sorter(void* endpoint)
00056 {
00057   try
00058   {
00059     zmq::socket_t s(ctx, ZMQ_REP);
00060     s.connect(static_cast<char*>(endpoint));
00061     std::cout << "connected to " << static_cast<char*>(endpoint) << std::endl;
00062     // socket to receive data from main thread and to send result back
00063     //   is connected
00064 
00065     for(;;)
00066     {
00067       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00068       ingress.receive_all();
00069       // all parts of incoming message are received
00070 
00071       if (ingress.size() == 1)
00072       {
00073         // treat one-part mesages as termination marker
00074         std::cout << "leaving ..." << std::endl;
00075         return 0;
00076       }
00077 
00078       std::vector<T> sort_area;
00079 
00080       std::copy(ingress.begin<T>(),
00081                 ingress.end<T>(),
00082                 std::back_inserter(sort_area));
00083       // every message part is a vector element now
00084 
00085       std::sort(sort_area.begin(), sort_area.end());
00086       // they are sorted according to type
00087 
00088       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(s, 0);
00089 
00090       std::copy(sort_area.begin(),
00091                 sort_area.end(),
00092                 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<T>(egress));
00093       // put to outgoing container
00094 
00095       egress.flush();
00096       // and sent to the main thread
00097     }
00098   }
00099   catch(const std::exception& ex)
00100   {
00101     std::cout << "caught (worker): " << ex.what();
00102     exit(3);
00103   }
00104 
00105   return 0;
00106 }
00107 
00108 void*
00109 relay(void*)
00110 {
00111   pthread_t string_tid;
00112   pthread_t int_tid;
00113 
00114   try
00115   {
00116     zmq::socket_t relay(ctx, ZMQ_REP);
00117     relay.connect(relay_endpoint);
00118 
00119     zmq::socket_t s(ctx, ZMQ_REQ);
00120     s.bind(string_endpoint);
00121 
00122     zmq::socket_t i(ctx, ZMQ_REQ);
00123     i.bind(int_endpoint);
00124 
00125     // start workers
00126     pthread_create(&string_tid, 0, sorter<StringFace>, const_cast<char*>(string_endpoint));
00127     pthread_create(&int_tid, 0, sorter<int>, const_cast<char*>(int_endpoint));
00128 
00129     for(;;)
00130     {
00131       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(relay);
00132       ingress.receive(1, false);
00133 
00134       StringFace mode;
00135       ingress >> mode;
00136 
00137       if (mode == int_mode)
00138       {
00139         ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_int(i, 0);
00140         to_int.relay_from(relay);
00141         // request is read
00142         to_int << 38 << ZmqMessage::Flush;
00143         // extra stuff added. sent to worker.
00144 
00145         ZmqMessage::relay_raw(i, relay, false);
00146         // answer from worker resent to main thread
00147       }
00148       else if (mode == string_mode)
00149       {
00150         ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_string(s, 0);
00151         to_string.relay_from(relay);
00152         to_string << "some addition" << ZmqMessage::Flush;
00153 
00154         ZmqMessage::relay_raw(s, relay, false);
00155         // answer from worker resent to main thread
00156       }
00157       else if (mode == stop_mode)
00158       {
00159         ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop_int(i, 0);
00160         to_stop_int << stop_mode << ZmqMessage::Flush;
00161 
00162         ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop_string(s, 0);
00163         to_stop_string << stop_mode << ZmqMessage::Flush;
00164 
00165         break;
00166       }
00167       else
00168       {
00169         assert(!"Unknown mode");
00170       }
00171     }
00172   }
00173   catch(const std::exception& ex)
00174   {
00175     std::cout << "caught (relay): " << ex.what();
00176     exit(3);
00177   }
00178 
00179   pthread_join(string_tid, 0);
00180   pthread_join(int_tid, 0);
00181   // worker threads are completed and sockets are closed
00182 
00183   return 0;
00184 }
00185 
00186 int
00187 main(int, char**)
00188 {
00189   pthread_t relay_tid;
00190 
00191   try
00192   {
00193     zmq::socket_t s(ctx, ZMQ_REQ);
00194     s.bind(relay_endpoint);
00195     // socket to talk to worker is bound
00196 
00197     // start relay
00198     pthread_create(&relay_tid, 0, relay, 0);
00199 
00200     {
00201       std::vector<std::string> desc;
00202 
00203       // populate 'zzzzzzzzzz' ... 'aaaaaaaaaa' string
00204       for (char letter = 'z'; letter >= 'a'; --letter)
00205       {
00206         std::string part(10, letter);
00207         desc.push_back(part);
00208       }
00209 
00210       std::cout << "Original array:" << std::endl;
00211       std::copy(desc.begin(), desc.end(),
00212                 std::ostream_iterator<std::string>(std::cout, "\n"));
00213 
00214       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0);
00215 
00216       to_sort << string_mode;
00217       std::copy(desc.begin(), desc.end(),
00218                 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<std::string>(to_sort));
00219       // strings are in outgoing container
00220 
00221       to_sort.flush();
00222       // outgoing container content is sent
00223 
00224       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s);
00225       sorted.receive_all(0, 0);
00226       // worker response received
00227 
00228       std::vector<std::string> asc;
00229 
00230       std::copy(sorted.begin<std::string>(), sorted.end<std::string>(),
00231                 std::back_inserter(asc));
00232       // copied to vector
00233 
00234       std::cout << "Sorted array:" << std::endl;
00235       std::copy(asc.begin(), asc.end(),
00236                 std::ostream_iterator<std::string>(std::cout, "\n"));
00237       // and printed out to demonstracte it is in ascending order
00238     }
00239 
00240     {
00241       std::vector<int> desc;
00242 
00243       // populate 100 ... 90 numbers
00244       for (int number = 100; number >= 90; --number)
00245       {
00246         desc.push_back(number);
00247       }
00248 
00249 
00250       std::cout << "Original array:" << std::endl;
00251       std::copy(desc.begin(), desc.end(),
00252                 std::ostream_iterator<int>(std::cout, "\n"));
00253 
00254       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0);
00255 
00256       to_sort << int_mode;
00257 
00258       to_sort << 950;
00259       // to prove we do numeric sort
00260 
00261       std::copy(desc.begin(), desc.end(),
00262                 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<int>(to_sort));
00263       // numbers are in outgoing container
00264 
00265       to_sort.flush();
00266       // outgoing container content is sent
00267 
00268       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s);
00269       sorted.receive_all(0, 0);
00270       // worker response received
00271 
00272       std::vector<int> asc;
00273 
00274       std::copy(sorted.begin<int>(), sorted.end<int>(),
00275                 std::back_inserter(asc));
00276       // copied to vector
00277 
00278       std::cout << "Sorted array:" << std::endl;
00279       std::copy(asc.begin(), asc.end(),
00280                 std::ostream_iterator<int>(std::cout, "\n"));
00281       // and printed out to demonstracte it is in ascending order
00282     }
00283 
00284 
00285     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(s, 0);
00286     to_stop << stop_mode;// << ZmqMessage::Flush;
00287     to_stop.flush();
00288     // send termination marker to worker
00289   }
00290   catch(const std::exception& ex)
00291   {
00292     std::cout << "caught (main): " << ex.what();
00293     exit(3);
00294   }
00295 
00296   pthread_join(relay_tid, 0);
00297   // relay thread is completed and sockets are closed
00298 }

ZmqMessage 0.1 — open source software, support@zmqmessage.org