Overview Tutorial API reference Examples Build Download | ZmqMessage 0.1 - 21 Oct 2011 |
00001 00009 #include <pthread.h> 00010 #include <vector> 00011 00012 #include <zmq.hpp> 00013 00014 #include "ZmqMessage.hpp" 00015 00024 //0mq context is globally available 00025 zmq::context_t ctx(1); 00026 00027 // endpoint to pass data from main thread to worker and back 00028 const char* endpoint = "inproc://transport"; 00029 00030 00031 // worker runs in a dedicated thread 00032 void* 00033 sorter(void*) 00034 { 00035 try 00036 { 00037 zmq::socket_t s(ctx, ZMQ_REP); 00038 s.connect(endpoint); 00039 // socket to receive data from main thread and to send result back 00040 // is connected 00041 00042 for(;;) 00043 { 00044 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s); 00045 ingress.receive_all(0, 0); 00046 // all parts of incoming message are received 00047 00048 if (ingress.size() == 1) 00049 { 00050 // treat one-part mesages as termination marker 00051 std::cout << "leaving ..." << std::endl; 00052 return 0; 00053 } 00054 00055 std::vector<std::string> sort_area; 00056 00057 std::copy(ingress.begin<std::string>(), 00058 ingress.end<std::string>(), 00059 std::back_inserter(sort_area)); 00060 // every message part is a vector element now 00061 00062 std::sort(sort_area.begin(), sort_area.end()); 00063 // they are semi-lexicography sorted 00064 00065 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(s, 0); 00066 00067 std::copy(sort_area.begin(), 00068 sort_area.end(), 00069 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<std::string>(egress)); 00070 // put to outgoing container 00071 00072 egress.flush(); 00073 // and sent to the main thread 00074 } 00075 } 00076 catch(const std::exception& ex) 00077 { 00078 std::cout << "caught (worker): " << ex.what(); 00079 exit(3); 00080 } 00081 00082 return 0; 00083 } 00084 00085 int 00086 main(int, char**) 00087 { 00088 pthread_t worker_tid; 00089 00090 zmq::socket_t s(ctx, ZMQ_REQ); 00091 s.bind(endpoint); 00092 // socket to talk to worker is bound 00093 00094 try 00095 { 00096 // start worker 00097 pthread_create(&worker_tid, 0, sorter, 0); 00098 00099 std::vector<std::string> desc; 00100 00101 // populate 'zzzzzzzzzz' ... 'aaaaaaaaaa' string 00102 for (char letter = 'z'; letter >= 'a'; --letter) 00103 { 00104 std::string part(10, letter); 00105 desc.push_back(part); 00106 } 00107 00108 std::cout << "Original array:" << std::endl; 00109 std::copy(desc.begin(), 00110 desc.end(), 00111 std::ostream_iterator<std::string>(std::cout, "\n")); 00112 00113 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0); 00114 00115 // we can use std::copy algorithm as well 00116 for (size_t i = 0; i < desc.size(); ++i) 00117 { 00118 to_sort << desc[i]; 00119 } 00120 // strings are in outgoing container 00121 00122 to_sort << "an arbitrary string" << 123; 00123 // extra string and number added to outgoing container 00124 00125 to_sort.flush(); 00126 // outgoing container content is sent 00127 00128 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s); 00129 sorted.receive_all(0, 0); 00130 // worker response received 00131 00132 std::vector<std::string> asc; 00133 00134 std::copy(sorted.begin<std::string>(), 00135 sorted.end<std::string>(), 00136 std::back_inserter(asc)); 00137 // copied to vector 00138 00139 std::cout << "Sorted array:" << std::endl; 00140 std::copy(asc.begin(), 00141 asc.end(), 00142 std::ostream_iterator<std::string>(std::cout, "\n")); 00143 // and printed out to demonstracte it is in ascending order 00144 00145 00146 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(s, 0); 00147 to_stop << "stop" << ZmqMessage::Flush; 00148 // send termination marker to worker 00149 } 00150 catch(const std::exception& ex) 00151 { 00152 std::cout << "caught (main): " << ex.what(); 00153 exit(3); 00154 } 00155 00156 pthread_join(worker_tid, 0); 00157 // thread is completed and sockets are closed 00158 } 00159