Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

include/zmqmessage/ZmqMessageFullImpl.hpp

Go to the documentation of this file.
00001 
00013 #ifndef ZMQMESSAGE_ZMQMESSAGEFULLIMPL_HPP_
00014 #define ZMQMESSAGE_ZMQMESSAGEFULLIMPL_HPP_
00015 
00016 #include <tr1/functional>
00017 
00018 namespace ZmqMessage
00019 {
00020   void
00021   send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00022     throw(ZmqErrorType)
00023   {
00024     int base_flags = nonblock ? ZMQ_NOBLOCK : 0;
00025     for (size_t i = 0; i < multipart.size(); ++i)
00026     {
00027       int flags = base_flags | ((i < multipart.size()-1) ? ZMQ_SNDMORE : 0);
00028       send_msg(sock, *(multipart.parts_[i]), flags);
00029     }
00030   }
00031 
00032   void
00033   Multipart::clear()
00034   {
00035     std::for_each(
00036         parts_.begin(), parts_.end(),
00037         &Private::del_obj_not_null<zmq::message_t>
00038     );
00039     parts_.clear();
00040   }
00041 
00042   Multipart*
00043   Multipart::detach()
00044   {
00045     std::auto_ptr<Multipart> m(new Multipart());
00046     m->reserve(parts_.size());
00047     m->parts_ = parts_; //copy pointers
00048     parts_.assign(parts_.size(), 0);
00049     return m.release();
00050   }
00051 
00052   void
00053   Multipart::check_has_part(size_t n) const throw(NoSuchPartError)
00054   {
00055     if (n >= size())
00056     {
00057       std::ostringstream ss;
00058       ss << "Multipart zmq message has only " << size() <<
00059           " parts received, but we requested index: " << n;
00060       throw NoSuchPartError(ss.str());
00061     }
00062     if (parts_[n] == 0)
00063     {
00064       std::ostringstream ss;
00065       ss << "Multipart zmq message at " << n <<
00066           " is not owned by this multipart";
00067       throw NoSuchPartError(ss.str());
00068     }
00069   }
00070 
00071   zmq::message_t&
00072   Multipart::operator[](size_t i) throw (NoSuchPartError)
00073   {
00074     check_has_part(i);
00075     return *(parts_[i]);
00076   }
00077 
00078   zmq::message_t*
00079   Multipart::release(size_t i)
00080   {
00081     if (i >= size())
00082     {
00083       return 0;
00084     }
00085     zmq::message_t* p = parts_[i];
00086     parts_[i] = 0;
00087     return p;
00088   }
00089 
00090   void
00091   XRouting::log_routing_received() const
00092   {
00093     ZMQMESSAGE_LOG_STREAM << "Receiving multipart, route received: "
00094       << routing_.size() << " parts";
00095   }
00096 
00097   void
00098   XRouting::receive_routing(zmq::socket_t& sock)
00099     throw (MessageFormatError, ZmqErrorType)
00100   {
00101     if (!routing_.empty())
00102     {
00103       return;
00104     }
00105     routing_.reserve(3); //should be enough for most cases
00106     for (int i = 0; ; ++i)
00107     {
00108       zmq::message_t* msg = new zmq::message_t;
00109       routing_.push_back(msg);
00110 
00111       recv_msg(sock, *msg);
00112       ZMQMESSAGE_LOG_STREAM << "Received X route: " << msg->size() << "bytes;"
00113         << ZMQMESSAGE_LOG_TERM;
00114 
00115       if (msg->size() == 0)
00116       {
00117         break;
00118       }
00119       if (!has_more(sock))
00120       {
00121         std::ostringstream ss;
00122         ss << "Receiving multipart message: reading route info failed: "
00123             << "part " << (i+1) << " has nothing after it. "
00124             << "Routing info doesn't end with null message";
00125         throw MessageFormatError(ss.str());
00126       }
00127     }
00128   }
00129 
00130   XRouting::~XRouting()
00131   {
00132     std::for_each(
00133         routing_.begin(), routing_.end(),
00134         &Private::del_obj_not_null<zmq::message_t>
00135     );
00136   }
00137 
00138   template <class RoutingPolicy>
00139   void
00140   Incoming<RoutingPolicy>::check_is_terminal() const throw(MessageFormatError)
00141   {
00142     if (!is_terminal_)
00143     {
00144       std::ostringstream ss;
00145       ss <<
00146         "Receiving multipart "
00147         "Has more messages after part " << size() << ", but must be terminal";
00148       throw MessageFormatError(ss.str());
00149     }
00150   }
00151 
00152   template <class RoutingPolicy>
00153   void
00154   Incoming<RoutingPolicy>::append_message_data(
00155     zmq::message_t& message, std::vector<char>& area) const
00156   {
00157     std::vector<char>::size_type sz = area.size();
00158     area.resize(sz + message.size());
00159     std::copy(static_cast<char*>(message.data()),
00160               static_cast<char*>(message.data()) + message.size(),
00161               area.begin() + sz
00162     );
00163   }
00164 
00165   template <class RoutingPolicy>
00166   bool
00167   Incoming<RoutingPolicy>::receive_one() throw(ZmqErrorType)
00168   {
00169     parts_.push_back(new zmq::message_t);
00170     zmq::message_t& cur_part = *(parts_.back());
00171 
00172     recv_msg(src_, cur_part);
00173     bool more = has_more(src_);
00174 
00175     ZMQMESSAGE_LOG_STREAM << "Incoming received "
00176       << cur_part.size() << " bytes: "
00177       << ZMQMESSAGE_STRING_CLASS((const char*)cur_part.data(),
00178         std::min(cur_part.size(), static_cast<size_t>(256)))
00179       << ", has more = " << more << ZMQMESSAGE_LOG_TERM;
00180     return more;
00181   }
00182 
00183   template <class RoutingPolicy>
00184   void
00185   Incoming<RoutingPolicy>::validate(
00186     const char* part_names[],
00187     size_t part_names_length, bool strict)
00188     throw (MessageFormatError)
00189   {
00190     if (parts_.size() < part_names_length)
00191     {
00192       std::ostringstream ss;
00193       ss <<
00194         "Validating received multipart: "
00195         "No more messages after " << part_names[parts_.size() - 1] <<
00196         "(" << (parts_.size()) << "), expected " <<
00197         (strict ? "exactly" : "at least") << " " << part_names_length;
00198       throw MessageFormatError(ss.str());
00199     }
00200     if (strict && parts_.size() > part_names_length)
00201     {
00202       std::ostringstream ss;
00203       ss <<
00204         "Validating received multipart: "
00205         "Have received " << parts_.size() << " parts, while expected "
00206         "exactly " << part_names_length;
00207       throw MessageFormatError(ss.str());
00208     }
00209   }
00210 
00211   template <class RoutingPolicy>
00212   Incoming <RoutingPolicy>&
00213   Incoming<RoutingPolicy>::receive(
00214       size_t parts, const char* part_names[],
00215       size_t part_names_length, bool check_terminal)
00216       throw (MessageFormatError, ZmqErrorType)
00217   {
00218     RoutingPolicy::receive_routing(src_);
00219     RoutingPolicy::log_routing_received();
00220 
00221     for (size_t i = 0, init_parts = size(); i < parts; ++i)
00222     {
00223       bool more = receive_one();
00224       const char* const part_name =
00225         (i < part_names_length) ? part_names[i] : "<unnamed>";
00226 
00227       if (i < parts - 1 && !more)
00228       {
00229         is_terminal_ = true;
00230         std::ostringstream ss;
00231         ss <<
00232           "Receiving multipart: "
00233           "No more messages after " << part_name <<
00234           "(" << (init_parts + i) << "), expected more";
00235         throw MessageFormatError(ss.str());
00236       }
00237       if (i == parts - 1 && more)
00238       {
00239         is_terminal_= false;
00240         if (check_terminal)
00241         {
00242           std::ostringstream ss;
00243           ss <<
00244             "Receiving multipart: "
00245             "Has more messages after " << part_name <<
00246             "(" << (init_parts + i) << "), expected no more messages";
00247           throw MessageFormatError(ss.str());
00248         }
00249       }
00250       if (i == parts - 1 && !more)
00251       {
00252         is_terminal_ = true;
00253       }
00254     }
00255     return *this;
00256   }
00257 
00258   template <class RoutingPolicy>
00259   Incoming <RoutingPolicy>&
00260   Incoming<RoutingPolicy>::receive_all(
00261       size_t min_parts, const char* part_names[], size_t part_names_length)
00262       throw (MessageFormatError, ZmqErrorType)
00263   {
00264     receive(min_parts, part_names, part_names_length, false);
00265 
00266     while (!is_terminal_)
00267     {
00268       is_terminal_ = !receive_one();
00269     }
00270 
00271     return *this;
00272   }
00273 
00274   template <class RoutingPolicy>
00275   Incoming <RoutingPolicy>&
00276   Incoming<RoutingPolicy>::receive_up_to(
00277     size_t min_parts,
00278     const char* part_names[], size_t max_parts)
00279     throw (MessageFormatError, ZmqErrorType)
00280   {
00281     receive(min_parts, part_names, false);
00282 
00283     for (size_t n = min_parts; n < max_parts && !is_terminal_; ++n)
00284     {
00285       is_terminal_ = !receive_one();
00286     }
00287 
00288     return *this;
00289   }
00290 
00291   template <class RoutingPolicy>
00292   int
00293   Incoming<RoutingPolicy>::fetch_tail(
00294       std::vector<char>& area, const char* delimiter) throw (ZmqErrorType)
00295   {
00296     append_message_data(*(parts_.back()), area);
00297     if (is_terminal_)
00298     {
00299       return 1;
00300     }
00301 
00302     size_t delim_sz = (delimiter) ? ::strlen(delimiter) : 0;
00303 
00304     int num_messages = 1;
00305     for (; has_more(src_); ++num_messages)
00306     {
00307       if (delim_sz)
00308       {
00309         std::vector<char>::size_type sz = area.size();
00310         area.resize(sz + delim_sz);
00311         std::copy(delimiter, delimiter + delim_sz, area.begin() + sz);
00312       }
00313       zmq::message_t data_buff;
00314       recv_msg(src_, data_buff);
00315       append_message_data(data_buff, area);
00316     }
00317     return num_messages;
00318   }
00319 
00320   template <class RoutingPolicy>
00321   int
00322   Incoming<RoutingPolicy>::drop_tail() throw(ZmqErrorType)
00323   {
00324     if (is_terminal_)
00325     {
00326       return 0;
00327     }
00328 
00329     zmq::message_t data_buff;
00330     int num_messages = 0;
00331     if (parts_.empty()) //we haven't received anything
00332     {
00333       if (!try_recv_msg(src_, data_buff, ZMQ_NOBLOCK))
00334       {
00335         return 0;
00336       }
00337       num_messages = 1;
00338     }
00339 
00340     for (; has_more(src_); ++num_messages)
00341     {
00342       recv_msg(src_, data_buff);
00343     }
00344     return num_messages;
00345   }
00346 
00347   template <class RoutingPolicy>
00348   Incoming<RoutingPolicy>&
00349   Incoming<RoutingPolicy>::operator>> (
00350     zmq::message_t& msg) throw(NoSuchPartError)
00351   {
00352     check_has_part(cur_extract_idx_);
00353     copy_msg(msg, *(parts_[cur_extract_idx_++]));
00354     return *this;
00355   }
00356 
00357   Sink&
00358   NullMessage(Sink& out)
00359   {
00360     out << *(new zmq::message_t(0));
00361     return out;
00362   }
00363 
00364   Sink&
00365   Flush(Sink& out)
00366   {
00367     out.flush();
00368     return out;
00369   }
00370 
00371   template <>
00372   void
00373   Outgoing<SimpleRouting>::send_routing(
00374     MsgPtrVec* routing) throw (ZmqErrorType)
00375   {
00376   }
00377 
00378   template <>
00379   void
00380   Outgoing<XRouting>::send_routing(
00381     MsgPtrVec* routing) throw (ZmqErrorType)
00382   {
00383     if (routing == 0)
00384     {
00385       ZMQMESSAGE_LOG_STREAM <<
00386         "X route: route vector is empty, send null message only"
00387         << ZMQMESSAGE_LOG_TERM;
00388       send_one(new zmq::message_t, false);
00389     }
00390     else
00391     {
00392       bool copy = options() & OutOptions::COPY_INCOMING;
00393       for (MsgPtrVec::iterator it = routing->begin();
00394           it != routing->end(); ++it)
00395       {
00396         zmq::message_t* msg = *it;
00397         if (!copy)
00398         {
00399           *it = 0;
00400         }
00401         send_one(msg, copy);
00402       }
00403     }
00404   }
00405 
00406   Sink&
00407   Sink::operator<< (zmq::message_t& msg)
00408     throw (ZmqErrorType)
00409   {
00410     bool copy_mode = options_ & OutOptions::COPY_INCOMING;
00411     bool use_copy = false;
00412     if (incoming_)
00413     {
00414       MsgPtrVec::iterator it = std::find(
00415         incoming_->parts_.begin(), incoming_->parts_.end(), &msg);
00416       if (it != incoming_->parts_.end())
00417       {
00418         use_copy = copy_mode;
00419         if (!copy_mode)
00420         {
00421           *it = 0;
00422         }
00423       }
00424     }
00425     send_one(&msg, use_copy);
00426     return *this;
00427   }
00428 
00429   Sink&
00430   Sink::operator<< (const RawMessage& m)
00431     throw (ZmqErrorType)
00432   {
00433     if (m.deleter)
00434     {
00435       send_owned(new zmq::message_t(m.data.ptr, m.sz, m.deleter, 0));
00436     }
00437     else
00438     {
00439       MsgPtr msg(new zmq::message_t);
00440       init_msg(m.data.cptr, m.sz, *msg);
00441       send_owned(msg.release());
00442     }
00443     return *this;
00444   }
00445 
00446   void
00447   Sink::add_to_queue(
00448     zmq::message_t* msg)
00449   {
00450     outgoing_queue_->parts_.push_back(msg);
00451   }
00452 
00453   void
00454   Sink::do_send_one(
00455     zmq::message_t* msg, bool last)
00456     throw (ZmqErrorType)
00457   {
00458     int flag = 0;
00459     if (!last) flag |= ZMQ_SNDMORE;
00460     if (options_ & OutOptions::NONBLOCK) flag |= ZMQ_NOBLOCK;
00461 
00462     ZMQMESSAGE_LOG_STREAM
00463       << "Outgoing sending msg, " << msg->size() << " bytes: "
00464       << ZMQMESSAGE_STRING_CLASS((const char*)msg->data(),
00465         std::min(msg->size(), static_cast<size_t>(256)))
00466       << ", flag = " << flag << ZMQMESSAGE_LOG_TERM;
00467 
00468     send_msg(dst_, *msg, flag);
00469   }
00470 
00471   bool
00472   Sink::try_send_first_cached(bool last)
00473     throw(ZmqErrorType)
00474   {
00475     assert(state_ == NOTSENT);
00476     assert(cached_.get());
00477     bool ret = true;
00478     try
00479     {
00480       if (options_ & OutOptions::EMULATE_BLOCK_SENDS)
00481       {
00482         errno = EAGAIN;
00483         ZMQMESSAGE_LOG_STREAM
00484           << "Emulating blocking send!" << ZMQMESSAGE_LOG_TERM;
00485         throw_zmq_error("Emulating blocking send");
00486       }
00487       else
00488       {
00489         do_send_one(cached_.get(), last);
00490         state_ = SENDING;
00491       }
00492     }
00493     catch (const ZmqErrorType& e)
00494     {
00495       ret = false;
00496       if (errno == EAGAIN)
00497       {
00498         if (options_ & OutOptions::CACHE_ON_BLOCK)
00499         {
00500           ZMQMESSAGE_LOG_STREAM
00501             << "Cannot send first outgoing message: would block: start caching"
00502             << ZMQMESSAGE_LOG_TERM;
00503           state_ = QUEUEING;
00504           outgoing_queue_.reset(new Multipart);
00505           add_to_queue(cached_.release());
00506         }
00507         else if (options_ & OutOptions::DROP_ON_BLOCK)
00508         {
00509           ZMQMESSAGE_LOG_STREAM <<
00510             "Cannot send first outgoing message: would block: dropping" <<
00511             ZMQMESSAGE_LOG_TERM;
00512           state_ = DROPPING;
00513         }
00514         else
00515         {
00516           throw;
00517         }
00518       }
00519       else
00520       {
00521         cached_.reset(0);
00522         ZMQMESSAGE_LOG_STREAM <<
00523           "Cannot send first outgoing message: error: " << e.what() <<
00524           ZMQMESSAGE_LOG_TERM;
00525 
00526         throw;
00527       }
00528     }
00529     return ret;
00530   }
00531 
00532   void
00533   Sink::send_one(
00534     zmq::message_t* msg, bool use_copy) throw(ZmqErrorType)
00535   {
00536     MsgPtr p(0);
00537     if (use_copy)
00538     {
00539       p.reset(new zmq::message_t);
00540       copy_msg(*p, *msg);
00541     }
00542     else
00543     {
00544       p.reset(msg);
00545     }
00546     send_owned(p.release());
00547   }
00548 
00549   void
00550   Sink::send_owned(
00551     zmq::message_t* owned) throw(ZmqErrorType)
00552   {
00553     MsgPtr msg(owned);
00554     switch (state_)
00555     {
00556     case NOTSENT:
00557       if (!cached_.get())
00558       {
00559         cached_ = msg;
00560         break;
00561       }
00562       if (try_send_first_cached(false))
00563       {
00564         cached_ = msg;
00565       }
00566       else
00567       {
00568         if (state_ == QUEUEING)
00569         {
00570           add_to_queue(msg.release());
00571         }
00572       }
00573 
00574       break;
00575     case SENDING:
00576       if (cached_.get())
00577       {
00578         do_send_one(cached_.get(), false);
00579         cached_ = msg;
00580       }
00581       else
00582       {
00583         ZMQMESSAGE_LOG_STREAM <<
00584           "Outgoing message in state SENDING, no messages cached yet - strange"
00585           << ZMQMESSAGE_LOG_TERM;
00586         cached_ = msg;
00587       }
00588 
00589       break;
00590     case QUEUEING:
00591       assert(outgoing_queue_.get());
00592       add_to_queue(msg.release());
00593 
00594       break;
00595     case DROPPING:
00596       break;
00597     case FLUSHED:
00598       ZMQMESSAGE_LOG_STREAM << "trying to send a message in FLUSHED state"
00599         << ZMQMESSAGE_LOG_TERM;
00600       break;
00601     }
00602   }
00603 
00604   void
00605   Sink::flush() throw(ZmqErrorType)
00606   {
00607     if (state_ == DROPPING)
00608     {
00609       return;
00610     }
00611     if (!cached_.get())
00612     {
00613       state_ = FLUSHED;
00614       return;
00615     }
00616 
00617     //handle cached
00618     switch (state_)
00619     {
00620     case NOTSENT:
00621       try_send_first_cached(true);
00622       break;
00623     case SENDING:
00624       do_send_one(cached_.get(), true);
00625       break;
00626 
00627     default:
00628       //cannot be
00629       break;
00630     }
00631     cached_.reset(0);
00632     state_ = FLUSHED;
00633   }
00634 
00635   void
00636   Sink::send_incoming_messages(size_t idx_from, size_t idx_to)
00637     throw(ZmqErrorType)
00638   {
00639     if (!incoming_)
00640     {
00641       return;
00642     }
00643     bool copy = options_ & OutOptions::COPY_INCOMING;
00644     send_incoming_messages(*incoming_, copy, idx_from, idx_to);
00645   }
00646 
00647   void
00648   Sink::send_incoming_messages(Multipart& multipart,
00649     bool copy, size_t idx_from, size_t idx_to)
00650     throw(ZmqErrorType)
00651   {
00652     size_t to = std::min(idx_to, multipart.parts_.size());
00653     for (size_t i = idx_from; i < to; ++i)
00654     {
00655       zmq::message_t* msg = multipart.parts_[i];
00656       if (!copy)
00657       {
00658         multipart.parts_[i] = 0;
00659       }
00660       send_one(msg, copy);
00661     }
00662   }
00663 
00664   void
00665   Sink::relay_from(zmq::socket_t& relay_src)
00666     throw (ZmqErrorType)
00667   {
00668     while (has_more(relay_src))
00669     {
00670       MsgPtr cur_part(new zmq::message_t);
00671       recv_msg(relay_src, *cur_part);
00672       send_owned(cur_part.release());
00673     }
00674   }
00675 
00676   Sink::~Sink()
00677   {
00678     try
00679     {
00680       flush();
00681     }
00682     catch (const ZmqErrorType& e)
00683     {
00684       ZMQMESSAGE_LOG_STREAM <<
00685         "Flushing outgoing message failed: " << e.what()
00686         << ZMQMESSAGE_LOG_TERM;
00687     }
00688   }
00689 }
00690 
00691 #endif /* ZMQMESSAGE_ZMQMESSAGEFULLIMPL_HPP_ */

ZmqMessage 0.1 — open source software, support@zmqmessage.org