Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

zsort.cpp

This example launches a separate thread which receives multipart message with arbitrary number of strings to sort. The thread sorts it and sends back the outgoing multipart message. Main thread receives sorted messages.

00001 
00009 #include <pthread.h>
00010 #include <vector>
00011 
00012 #include <zmq.hpp>
00013 
00014 #include "ZmqMessage.hpp"
00015 
00024 //0mq context is globally available
00025 zmq::context_t ctx(1);
00026 
00027 // endpoint to pass data from main thread to worker and back
00028 const char* endpoint = "inproc://transport";
00029 
00030 
00031 // worker runs in a dedicated thread
00032 void*
00033 sorter(void*)
00034 {
00035   try
00036   {
00037     zmq::socket_t s(ctx, ZMQ_REP);
00038     s.connect(endpoint);
00039     // socket to receive data from main thread and to send result back
00040     //   is connected
00041 
00042     for(;;)
00043     {
00044       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00045       ingress.receive_all(0, 0);
00046       // all parts of incoming message are received
00047 
00048       if (ingress.size() == 1)
00049       {
00050         // treat one-part mesages as termination marker
00051         std::cout << "leaving ..." << std::endl;
00052         return 0;
00053       }
00054 
00055       std::vector<std::string> sort_area;
00056 
00057       std::copy(ingress.begin<std::string>(),
00058                 ingress.end<std::string>(),
00059                 std::back_inserter(sort_area));
00060       // every message part is a vector element now
00061 
00062       std::sort(sort_area.begin(), sort_area.end());
00063       // they are semi-lexicography sorted
00064 
00065       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(s, 0);
00066 
00067       std::copy(sort_area.begin(),
00068                 sort_area.end(),
00069                 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<std::string>(egress));
00070       // put to outgoing container
00071 
00072       egress.flush();
00073       // and sent to the main thread
00074     }
00075   }
00076   catch(const std::exception& ex)
00077   {
00078     std::cout << "caught (worker): " << ex.what();
00079     exit(3);
00080   }
00081 
00082   return 0;
00083 }
00084 
00085 int
00086 main(int, char**)
00087 {
00088   pthread_t worker_tid;
00089 
00090   zmq::socket_t s(ctx, ZMQ_REQ);
00091   s.bind(endpoint);
00092   // socket to talk to worker is bound
00093 
00094   try
00095   {
00096     // start worker
00097     pthread_create(&worker_tid, 0, sorter, 0);
00098 
00099     std::vector<std::string> desc;
00100 
00101     // populate 'zzzzzzzzzz' ... 'aaaaaaaaaa' string
00102     for (char letter = 'z'; letter >= 'a'; --letter)
00103     {
00104       std::string part(10, letter);
00105       desc.push_back(part);
00106     }
00107 
00108     std::cout << "Original array:" << std::endl;
00109     std::copy(desc.begin(),
00110               desc.end(),
00111               std::ostream_iterator<std::string>(std::cout, "\n"));
00112 
00113     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0);
00114 
00115     //     we can use std::copy algorithm as well
00116     for (size_t i = 0; i < desc.size(); ++i)
00117     {
00118       to_sort << desc[i];
00119     }
00120     // strings are in outgoing container
00121 
00122     to_sort << "an arbitrary string" << 123;
00123     // extra string and number added to outgoing container
00124 
00125     to_sort.flush();
00126     // outgoing container content is sent
00127 
00128     ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s);
00129     sorted.receive_all(0, 0);
00130     // worker response received
00131 
00132     std::vector<std::string> asc;
00133 
00134     std::copy(sorted.begin<std::string>(),
00135               sorted.end<std::string>(),
00136               std::back_inserter(asc));
00137     // copied to vector
00138 
00139     std::cout << "Sorted array:" << std::endl;
00140     std::copy(asc.begin(),
00141               asc.end(),
00142               std::ostream_iterator<std::string>(std::cout, "\n"));
00143     // and printed out to demonstracte it is in ascending order
00144 
00145 
00146     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(s, 0);
00147     to_stop << "stop" << ZmqMessage::Flush;
00148     // send termination marker to worker
00149   }
00150   catch(const std::exception& ex)
00151   {
00152     std::cout << "caught (main): " << ex.what();
00153     exit(3);
00154   }
00155 
00156   pthread_join(worker_tid, 0);
00157   // thread is completed and sockets are closed
00158 }
00159 

ZmqMessage 0.1 — open source software, support@zmqmessage.org