00001 
00009 #include <pthread.h>
00010 #include <vector>
00011 
00012 #include <zmq.hpp>
00013 
00014 #include "ZmqMessage.hpp"
00015 
00024 
00025 zmq::context_t ctx(1);
00026 
00027 
00028 const char* endpoint = "inproc://transport";
00029 
00030 
00031 
00032 void*
00033 sorter(void*)
00034 {
00035   try
00036   {
00037     zmq::socket_t s(ctx, ZMQ_REP);
00038     s.connect(endpoint);
00039     
00040     
00041 
00042     for(;;)
00043     {
00044       ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00045       ingress.receive_all(0, 0);
00046       
00047 
00048       if (ingress.size() == 1)
00049       {
00050         
00051         std::cout << "leaving ..." << std::endl;
00052         return 0;
00053       }
00054 
00055       std::vector<std::string> sort_area;
00056 
00057       std::copy(ingress.begin<std::string>(),
00058                 ingress.end<std::string>(),
00059                 std::back_inserter(sort_area));
00060       
00061 
00062       std::sort(sort_area.begin(), sort_area.end());
00063       
00064 
00065       ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(s, 0);
00066 
00067       std::copy(sort_area.begin(),
00068                 sort_area.end(),
00069                 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<std::string>(egress));
00070       
00071 
00072       egress.flush();
00073       
00074     }
00075   }
00076   catch(const std::exception& ex)
00077   {
00078     std::cout << "caught (worker): " << ex.what();
00079     exit(3);
00080   }
00081 
00082   return 0;
00083 }
00084 
00085 int
00086 main(int, char**)
00087 {
00088   pthread_t worker_tid;
00089 
00090   zmq::socket_t s(ctx, ZMQ_REQ);
00091   s.bind(endpoint);
00092   
00093 
00094   try
00095   {
00096     
00097     pthread_create(&worker_tid, 0, sorter, 0);
00098 
00099     std::vector<std::string> desc;
00100 
00101     
00102     for (char letter = 'z'; letter >= 'a'; --letter)
00103     {
00104       std::string part(10, letter);
00105       desc.push_back(part);
00106     }
00107 
00108     std::cout << "Original array:" << std::endl;
00109     std::copy(desc.begin(),
00110               desc.end(),
00111               std::ostream_iterator<std::string>(std::cout, "\n"));
00112 
00113     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0);
00114 
00115     
00116     for (size_t i = 0; i < desc.size(); ++i)
00117     {
00118       to_sort << desc[i];
00119     }
00120     
00121 
00122     to_sort << "an arbitrary string" << 123;
00123     
00124 
00125     to_sort.flush();
00126     
00127 
00128     ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s);
00129     sorted.receive_all(0, 0);
00130     
00131 
00132     std::vector<std::string> asc;
00133 
00134     std::copy(sorted.begin<std::string>(),
00135               sorted.end<std::string>(),
00136               std::back_inserter(asc));
00137     
00138 
00139     std::cout << "Sorted array:" << std::endl;
00140     std::copy(asc.begin(),
00141               asc.end(),
00142               std::ostream_iterator<std::string>(std::cout, "\n"));
00143     
00144 
00145 
00146     ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(s, 0);
00147     to_stop << "stop" << ZmqMessage::Flush;
00148     
00149   }
00150   catch(const std::exception& ex)
00151   {
00152     std::cout << "caught (main): " << ex.what();
00153     exit(3);
00154   }
00155 
00156   pthread_join(worker_tid, 0);
00157   
00158 }
00159