00001
00013 #ifndef ZMQMESSAGE_ZMQMESSAGEFULLIMPL_HPP_
00014 #define ZMQMESSAGE_ZMQMESSAGEFULLIMPL_HPP_
00015
00016 #include <tr1/functional>
00017
00018 namespace ZmqMessage
00019 {
00020 void
00021 send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00022 throw(ZmqErrorType)
00023 {
00024 int base_flags = nonblock ? ZMQ_NOBLOCK : 0;
00025 for (size_t i = 0; i < multipart.size(); ++i)
00026 {
00027 int flags = base_flags | ((i < multipart.size()-1) ? ZMQ_SNDMORE : 0);
00028 send_msg(sock, *(multipart.parts_[i]), flags);
00029 }
00030 }
00031
00032 void
00033 Multipart::clear()
00034 {
00035 std::for_each(
00036 parts_.begin(), parts_.end(),
00037 &Private::del_obj_not_null<zmq::message_t>
00038 );
00039 parts_.clear();
00040 }
00041
00042 Multipart*
00043 Multipart::detach()
00044 {
00045 std::auto_ptr<Multipart> m(new Multipart());
00046 m->reserve(parts_.size());
00047 m->parts_ = parts_;
00048 parts_.assign(parts_.size(), 0);
00049 return m.release();
00050 }
00051
00052 void
00053 Multipart::check_has_part(size_t n) const throw(NoSuchPartError)
00054 {
00055 if (n >= size())
00056 {
00057 std::ostringstream ss;
00058 ss << "Multipart zmq message has only " << size() <<
00059 " parts received, but we requested index: " << n;
00060 throw NoSuchPartError(ss.str());
00061 }
00062 if (parts_[n] == 0)
00063 {
00064 std::ostringstream ss;
00065 ss << "Multipart zmq message at " << n <<
00066 " is not owned by this multipart";
00067 throw NoSuchPartError(ss.str());
00068 }
00069 }
00070
00071 zmq::message_t&
00072 Multipart::operator[](size_t i) throw (NoSuchPartError)
00073 {
00074 check_has_part(i);
00075 return *(parts_[i]);
00076 }
00077
00078 zmq::message_t*
00079 Multipart::release(size_t i)
00080 {
00081 if (i >= size())
00082 {
00083 return 0;
00084 }
00085 zmq::message_t* p = parts_[i];
00086 parts_[i] = 0;
00087 return p;
00088 }
00089
00090 void
00091 XRouting::log_routing_received() const
00092 {
00093 ZMQMESSAGE_LOG_STREAM << "Receiving multipart, route received: "
00094 << routing_.size() << " parts";
00095 }
00096
00097 void
00098 XRouting::receive_routing(zmq::socket_t& sock)
00099 throw (MessageFormatError, ZmqErrorType)
00100 {
00101 if (!routing_.empty())
00102 {
00103 return;
00104 }
00105 routing_.reserve(3);
00106 for (int i = 0; ; ++i)
00107 {
00108 zmq::message_t* msg = new zmq::message_t;
00109 routing_.push_back(msg);
00110
00111 recv_msg(sock, *msg);
00112 ZMQMESSAGE_LOG_STREAM << "Received X route: " << msg->size() << "bytes;"
00113 << ZMQMESSAGE_LOG_TERM;
00114
00115 if (msg->size() == 0)
00116 {
00117 break;
00118 }
00119 if (!has_more(sock))
00120 {
00121 std::ostringstream ss;
00122 ss << "Receiving multipart message: reading route info failed: "
00123 << "part " << (i+1) << " has nothing after it. "
00124 << "Routing info doesn't end with null message";
00125 throw MessageFormatError(ss.str());
00126 }
00127 }
00128 }
00129
00130 XRouting::~XRouting()
00131 {
00132 std::for_each(
00133 routing_.begin(), routing_.end(),
00134 &Private::del_obj_not_null<zmq::message_t>
00135 );
00136 }
00137
00138 template <class RoutingPolicy>
00139 void
00140 Incoming<RoutingPolicy>::check_is_terminal() const throw(MessageFormatError)
00141 {
00142 if (!is_terminal_)
00143 {
00144 std::ostringstream ss;
00145 ss <<
00146 "Receiving multipart "
00147 "Has more messages after part " << size() << ", but must be terminal";
00148 throw MessageFormatError(ss.str());
00149 }
00150 }
00151
00152 template <class RoutingPolicy>
00153 void
00154 Incoming<RoutingPolicy>::append_message_data(
00155 zmq::message_t& message, std::vector<char>& area) const
00156 {
00157 std::vector<char>::size_type sz = area.size();
00158 area.resize(sz + message.size());
00159 std::copy(static_cast<char*>(message.data()),
00160 static_cast<char*>(message.data()) + message.size(),
00161 area.begin() + sz
00162 );
00163 }
00164
00165 template <class RoutingPolicy>
00166 bool
00167 Incoming<RoutingPolicy>::receive_one() throw(ZmqErrorType)
00168 {
00169 parts_.push_back(new zmq::message_t);
00170 zmq::message_t& cur_part = *(parts_.back());
00171
00172 recv_msg(src_, cur_part);
00173 bool more = has_more(src_);
00174
00175 ZMQMESSAGE_LOG_STREAM << "Incoming received "
00176 << cur_part.size() << " bytes: "
00177 << ZMQMESSAGE_STRING_CLASS((const char*)cur_part.data(),
00178 std::min(cur_part.size(), static_cast<size_t>(256)))
00179 << ", has more = " << more << ZMQMESSAGE_LOG_TERM;
00180 return more;
00181 }
00182
00183 template <class RoutingPolicy>
00184 void
00185 Incoming<RoutingPolicy>::validate(
00186 const char* part_names[],
00187 size_t part_names_length, bool strict)
00188 throw (MessageFormatError)
00189 {
00190 if (parts_.size() < part_names_length)
00191 {
00192 std::ostringstream ss;
00193 ss <<
00194 "Validating received multipart: "
00195 "No more messages after " << part_names[parts_.size() - 1] <<
00196 "(" << (parts_.size()) << "), expected " <<
00197 (strict ? "exactly" : "at least") << " " << part_names_length;
00198 throw MessageFormatError(ss.str());
00199 }
00200 if (strict && parts_.size() > part_names_length)
00201 {
00202 std::ostringstream ss;
00203 ss <<
00204 "Validating received multipart: "
00205 "Have received " << parts_.size() << " parts, while expected "
00206 "exactly " << part_names_length;
00207 throw MessageFormatError(ss.str());
00208 }
00209 }
00210
00211 template <class RoutingPolicy>
00212 Incoming <RoutingPolicy>&
00213 Incoming<RoutingPolicy>::receive(
00214 size_t parts, const char* part_names[],
00215 size_t part_names_length, bool check_terminal)
00216 throw (MessageFormatError, ZmqErrorType)
00217 {
00218 RoutingPolicy::receive_routing(src_);
00219 RoutingPolicy::log_routing_received();
00220
00221 for (size_t i = 0, init_parts = size(); i < parts; ++i)
00222 {
00223 bool more = receive_one();
00224 const char* const part_name =
00225 (i < part_names_length) ? part_names[i] : "<unnamed>";
00226
00227 if (i < parts - 1 && !more)
00228 {
00229 is_terminal_ = true;
00230 std::ostringstream ss;
00231 ss <<
00232 "Receiving multipart: "
00233 "No more messages after " << part_name <<
00234 "(" << (init_parts + i) << "), expected more";
00235 throw MessageFormatError(ss.str());
00236 }
00237 if (i == parts - 1 && more)
00238 {
00239 is_terminal_= false;
00240 if (check_terminal)
00241 {
00242 std::ostringstream ss;
00243 ss <<
00244 "Receiving multipart: "
00245 "Has more messages after " << part_name <<
00246 "(" << (init_parts + i) << "), expected no more messages";
00247 throw MessageFormatError(ss.str());
00248 }
00249 }
00250 if (i == parts - 1 && !more)
00251 {
00252 is_terminal_ = true;
00253 }
00254 }
00255 return *this;
00256 }
00257
00258 template <class RoutingPolicy>
00259 Incoming <RoutingPolicy>&
00260 Incoming<RoutingPolicy>::receive_all(
00261 size_t min_parts, const char* part_names[], size_t part_names_length)
00262 throw (MessageFormatError, ZmqErrorType)
00263 {
00264 receive(min_parts, part_names, part_names_length, false);
00265
00266 while (!is_terminal_)
00267 {
00268 is_terminal_ = !receive_one();
00269 }
00270
00271 return *this;
00272 }
00273
00274 template <class RoutingPolicy>
00275 Incoming <RoutingPolicy>&
00276 Incoming<RoutingPolicy>::receive_up_to(
00277 size_t min_parts,
00278 const char* part_names[], size_t max_parts)
00279 throw (MessageFormatError, ZmqErrorType)
00280 {
00281 receive(min_parts, part_names, false);
00282
00283 for (size_t n = min_parts; n < max_parts && !is_terminal_; ++n)
00284 {
00285 is_terminal_ = !receive_one();
00286 }
00287
00288 return *this;
00289 }
00290
00291 template <class RoutingPolicy>
00292 int
00293 Incoming<RoutingPolicy>::fetch_tail(
00294 std::vector<char>& area, const char* delimiter) throw (ZmqErrorType)
00295 {
00296 append_message_data(*(parts_.back()), area);
00297 if (is_terminal_)
00298 {
00299 return 1;
00300 }
00301
00302 size_t delim_sz = (delimiter) ? ::strlen(delimiter) : 0;
00303
00304 int num_messages = 1;
00305 for (; has_more(src_); ++num_messages)
00306 {
00307 if (delim_sz)
00308 {
00309 std::vector<char>::size_type sz = area.size();
00310 area.resize(sz + delim_sz);
00311 std::copy(delimiter, delimiter + delim_sz, area.begin() + sz);
00312 }
00313 zmq::message_t data_buff;
00314 recv_msg(src_, data_buff);
00315 append_message_data(data_buff, area);
00316 }
00317 return num_messages;
00318 }
00319
00320 template <class RoutingPolicy>
00321 int
00322 Incoming<RoutingPolicy>::drop_tail() throw(ZmqErrorType)
00323 {
00324 if (is_terminal_)
00325 {
00326 return 0;
00327 }
00328
00329 zmq::message_t data_buff;
00330 int num_messages = 0;
00331 if (parts_.empty())
00332 {
00333 if (!try_recv_msg(src_, data_buff, ZMQ_NOBLOCK))
00334 {
00335 return 0;
00336 }
00337 num_messages = 1;
00338 }
00339
00340 for (; has_more(src_); ++num_messages)
00341 {
00342 recv_msg(src_, data_buff);
00343 }
00344 return num_messages;
00345 }
00346
00347 template <class RoutingPolicy>
00348 Incoming<RoutingPolicy>&
00349 Incoming<RoutingPolicy>::operator>> (
00350 zmq::message_t& msg) throw(NoSuchPartError)
00351 {
00352 check_has_part(cur_extract_idx_);
00353 copy_msg(msg, *(parts_[cur_extract_idx_++]));
00354 return *this;
00355 }
00356
00357 Sink&
00358 NullMessage(Sink& out)
00359 {
00360 out << *(new zmq::message_t(0));
00361 return out;
00362 }
00363
00364 Sink&
00365 Flush(Sink& out)
00366 {
00367 out.flush();
00368 return out;
00369 }
00370
00371 template <>
00372 void
00373 Outgoing<SimpleRouting>::send_routing(
00374 MsgPtrVec* routing) throw (ZmqErrorType)
00375 {
00376 }
00377
00378 template <>
00379 void
00380 Outgoing<XRouting>::send_routing(
00381 MsgPtrVec* routing) throw (ZmqErrorType)
00382 {
00383 if (routing == 0)
00384 {
00385 ZMQMESSAGE_LOG_STREAM <<
00386 "X route: route vector is empty, send null message only"
00387 << ZMQMESSAGE_LOG_TERM;
00388 send_one(new zmq::message_t, false);
00389 }
00390 else
00391 {
00392 bool copy = options() & OutOptions::COPY_INCOMING;
00393 for (MsgPtrVec::iterator it = routing->begin();
00394 it != routing->end(); ++it)
00395 {
00396 zmq::message_t* msg = *it;
00397 if (!copy)
00398 {
00399 *it = 0;
00400 }
00401 send_one(msg, copy);
00402 }
00403 }
00404 }
00405
00406 Sink&
00407 Sink::operator<< (zmq::message_t& msg)
00408 throw (ZmqErrorType)
00409 {
00410 bool copy_mode = options_ & OutOptions::COPY_INCOMING;
00411 bool use_copy = false;
00412 if (incoming_)
00413 {
00414 MsgPtrVec::iterator it = std::find(
00415 incoming_->parts_.begin(), incoming_->parts_.end(), &msg);
00416 if (it != incoming_->parts_.end())
00417 {
00418 use_copy = copy_mode;
00419 if (!copy_mode)
00420 {
00421 *it = 0;
00422 }
00423 }
00424 }
00425 send_one(&msg, use_copy);
00426 return *this;
00427 }
00428
00429 Sink&
00430 Sink::operator<< (const RawMessage& m)
00431 throw (ZmqErrorType)
00432 {
00433 if (m.deleter)
00434 {
00435 send_owned(new zmq::message_t(m.data.ptr, m.sz, m.deleter, 0));
00436 }
00437 else
00438 {
00439 MsgPtr msg(new zmq::message_t);
00440 init_msg(m.data.cptr, m.sz, *msg);
00441 send_owned(msg.release());
00442 }
00443 return *this;
00444 }
00445
00446 void
00447 Sink::add_to_queue(
00448 zmq::message_t* msg)
00449 {
00450 outgoing_queue_->parts_.push_back(msg);
00451 }
00452
00453 void
00454 Sink::do_send_one(
00455 zmq::message_t* msg, bool last)
00456 throw (ZmqErrorType)
00457 {
00458 int flag = 0;
00459 if (!last) flag |= ZMQ_SNDMORE;
00460 if (options_ & OutOptions::NONBLOCK) flag |= ZMQ_NOBLOCK;
00461
00462 ZMQMESSAGE_LOG_STREAM
00463 << "Outgoing sending msg, " << msg->size() << " bytes: "
00464 << ZMQMESSAGE_STRING_CLASS((const char*)msg->data(),
00465 std::min(msg->size(), static_cast<size_t>(256)))
00466 << ", flag = " << flag << ZMQMESSAGE_LOG_TERM;
00467
00468 send_msg(dst_, *msg, flag);
00469 }
00470
00471 bool
00472 Sink::try_send_first_cached(bool last)
00473 throw(ZmqErrorType)
00474 {
00475 assert(state_ == NOTSENT);
00476 assert(cached_.get());
00477 bool ret = true;
00478 try
00479 {
00480 if (options_ & OutOptions::EMULATE_BLOCK_SENDS)
00481 {
00482 errno = EAGAIN;
00483 ZMQMESSAGE_LOG_STREAM
00484 << "Emulating blocking send!" << ZMQMESSAGE_LOG_TERM;
00485 throw_zmq_error("Emulating blocking send");
00486 }
00487 else
00488 {
00489 do_send_one(cached_.get(), last);
00490 state_ = SENDING;
00491 }
00492 }
00493 catch (const ZmqErrorType& e)
00494 {
00495 ret = false;
00496 if (errno == EAGAIN)
00497 {
00498 if (options_ & OutOptions::CACHE_ON_BLOCK)
00499 {
00500 ZMQMESSAGE_LOG_STREAM
00501 << "Cannot send first outgoing message: would block: start caching"
00502 << ZMQMESSAGE_LOG_TERM;
00503 state_ = QUEUEING;
00504 outgoing_queue_.reset(new Multipart);
00505 add_to_queue(cached_.release());
00506 }
00507 else if (options_ & OutOptions::DROP_ON_BLOCK)
00508 {
00509 ZMQMESSAGE_LOG_STREAM <<
00510 "Cannot send first outgoing message: would block: dropping" <<
00511 ZMQMESSAGE_LOG_TERM;
00512 state_ = DROPPING;
00513 }
00514 else
00515 {
00516 throw;
00517 }
00518 }
00519 else
00520 {
00521 cached_.reset(0);
00522 ZMQMESSAGE_LOG_STREAM <<
00523 "Cannot send first outgoing message: error: " << e.what() <<
00524 ZMQMESSAGE_LOG_TERM;
00525
00526 throw;
00527 }
00528 }
00529 return ret;
00530 }
00531
00532 void
00533 Sink::send_one(
00534 zmq::message_t* msg, bool use_copy) throw(ZmqErrorType)
00535 {
00536 MsgPtr p(0);
00537 if (use_copy)
00538 {
00539 p.reset(new zmq::message_t);
00540 copy_msg(*p, *msg);
00541 }
00542 else
00543 {
00544 p.reset(msg);
00545 }
00546 send_owned(p.release());
00547 }
00548
00549 void
00550 Sink::send_owned(
00551 zmq::message_t* owned) throw(ZmqErrorType)
00552 {
00553 MsgPtr msg(owned);
00554 switch (state_)
00555 {
00556 case NOTSENT:
00557 if (!cached_.get())
00558 {
00559 cached_ = msg;
00560 break;
00561 }
00562 if (try_send_first_cached(false))
00563 {
00564 cached_ = msg;
00565 }
00566 else
00567 {
00568 if (state_ == QUEUEING)
00569 {
00570 add_to_queue(msg.release());
00571 }
00572 }
00573
00574 break;
00575 case SENDING:
00576 if (cached_.get())
00577 {
00578 do_send_one(cached_.get(), false);
00579 cached_ = msg;
00580 }
00581 else
00582 {
00583 ZMQMESSAGE_LOG_STREAM <<
00584 "Outgoing message in state SENDING, no messages cached yet - strange"
00585 << ZMQMESSAGE_LOG_TERM;
00586 cached_ = msg;
00587 }
00588
00589 break;
00590 case QUEUEING:
00591 assert(outgoing_queue_.get());
00592 add_to_queue(msg.release());
00593
00594 break;
00595 case DROPPING:
00596 break;
00597 case FLUSHED:
00598 ZMQMESSAGE_LOG_STREAM << "trying to send a message in FLUSHED state"
00599 << ZMQMESSAGE_LOG_TERM;
00600 break;
00601 }
00602 }
00603
00604 void
00605 Sink::flush() throw(ZmqErrorType)
00606 {
00607 if (state_ == DROPPING)
00608 {
00609 return;
00610 }
00611 if (!cached_.get())
00612 {
00613 state_ = FLUSHED;
00614 return;
00615 }
00616
00617
00618 switch (state_)
00619 {
00620 case NOTSENT:
00621 try_send_first_cached(true);
00622 break;
00623 case SENDING:
00624 do_send_one(cached_.get(), true);
00625 break;
00626
00627 default:
00628
00629 break;
00630 }
00631 cached_.reset(0);
00632 state_ = FLUSHED;
00633 }
00634
00635 void
00636 Sink::send_incoming_messages(size_t idx_from, size_t idx_to)
00637 throw(ZmqErrorType)
00638 {
00639 if (!incoming_)
00640 {
00641 return;
00642 }
00643 bool copy = options_ & OutOptions::COPY_INCOMING;
00644 send_incoming_messages(*incoming_, copy, idx_from, idx_to);
00645 }
00646
00647 void
00648 Sink::send_incoming_messages(Multipart& multipart,
00649 bool copy, size_t idx_from, size_t idx_to)
00650 throw(ZmqErrorType)
00651 {
00652 size_t to = std::min(idx_to, multipart.parts_.size());
00653 for (size_t i = idx_from; i < to; ++i)
00654 {
00655 zmq::message_t* msg = multipart.parts_[i];
00656 if (!copy)
00657 {
00658 multipart.parts_[i] = 0;
00659 }
00660 send_one(msg, copy);
00661 }
00662 }
00663
00664 void
00665 Sink::relay_from(zmq::socket_t& relay_src)
00666 throw (ZmqErrorType)
00667 {
00668 while (has_more(relay_src))
00669 {
00670 MsgPtr cur_part(new zmq::message_t);
00671 recv_msg(relay_src, *cur_part);
00672 send_owned(cur_part.release());
00673 }
00674 }
00675
00676 Sink::~Sink()
00677 {
00678 try
00679 {
00680 flush();
00681 }
00682 catch (const ZmqErrorType& e)
00683 {
00684 ZMQMESSAGE_LOG_STREAM <<
00685 "Flushing outgoing message failed: " << e.what()
00686 << ZMQMESSAGE_LOG_TERM;
00687 }
00688 }
00689 }
00690
00691 #endif