00001
00009 #include <pthread.h>
00010 #include <vector>
00011
00012 #include <zmq.hpp>
00013
00014 #include "ZmqMessage.hpp"
00015
00024
00025 zmq::context_t ctx(1);
00026
00027
00028 const char* endpoint = "inproc://transport";
00029
00030
00031
00032 void*
00033 sorter(void*)
00034 {
00035 try
00036 {
00037 zmq::socket_t s(ctx, ZMQ_REP);
00038 s.connect(endpoint);
00039
00040
00041
00042 for(;;)
00043 {
00044 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> ingress(s);
00045 ingress.receive_all(0, 0);
00046
00047
00048 if (ingress.size() == 1)
00049 {
00050
00051 std::cout << "leaving ..." << std::endl;
00052 return 0;
00053 }
00054
00055 std::vector<std::string> sort_area;
00056
00057 std::copy(ingress.begin<std::string>(),
00058 ingress.end<std::string>(),
00059 std::back_inserter(sort_area));
00060
00061
00062 std::sort(sort_area.begin(), sort_area.end());
00063
00064
00065 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> egress(s, 0);
00066
00067 std::copy(sort_area.begin(),
00068 sort_area.end(),
00069 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting>::iterator<std::string>(egress));
00070
00071
00072 egress.flush();
00073
00074 }
00075 }
00076 catch(const std::exception& ex)
00077 {
00078 std::cout << "caught (worker): " << ex.what();
00079 exit(3);
00080 }
00081
00082 return 0;
00083 }
00084
00085 int
00086 main(int, char**)
00087 {
00088 pthread_t worker_tid;
00089
00090 zmq::socket_t s(ctx, ZMQ_REQ);
00091 s.bind(endpoint);
00092
00093
00094 try
00095 {
00096
00097 pthread_create(&worker_tid, 0, sorter, 0);
00098
00099 std::vector<std::string> desc;
00100
00101
00102 for (char letter = 'z'; letter >= 'a'; --letter)
00103 {
00104 std::string part(10, letter);
00105 desc.push_back(part);
00106 }
00107
00108 std::cout << "Original array:" << std::endl;
00109 std::copy(desc.begin(),
00110 desc.end(),
00111 std::ostream_iterator<std::string>(std::cout, "\n"));
00112
00113 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_sort(s, 0);
00114
00115
00116 for (size_t i = 0; i < desc.size(); ++i)
00117 {
00118 to_sort << desc[i];
00119 }
00120
00121
00122 to_sort << "an arbitrary string" << 123;
00123
00124
00125 to_sort.flush();
00126
00127
00128 ZmqMessage::Incoming<ZmqMessage::SimpleRouting> sorted(s);
00129 sorted.receive_all(0, 0);
00130
00131
00132 std::vector<std::string> asc;
00133
00134 std::copy(sorted.begin<std::string>(),
00135 sorted.end<std::string>(),
00136 std::back_inserter(asc));
00137
00138
00139 std::cout << "Sorted array:" << std::endl;
00140 std::copy(asc.begin(),
00141 asc.end(),
00142 std::ostream_iterator<std::string>(std::cout, "\n"));
00143
00144
00145
00146 ZmqMessage::Outgoing<ZmqMessage::SimpleRouting> to_stop(s, 0);
00147 to_stop << "stop" << ZmqMessage::Flush;
00148
00149 }
00150 catch(const std::exception& ex)
00151 {
00152 std::cout << "caught (main): " << ex.what();
00153 exit(3);
00154 }
00155
00156 pthread_join(worker_tid, 0);
00157
00158 }
00159