00001
00011 #include <functional>
00012 #include <algorithm>
00013
00014 #include "zmqmessage/TypeCheck.hpp"
00015
00016 #ifndef ZMQMESSAGE_ZMQMESSAGETEMPLATEIMPL_HPP_
00017 #define ZMQMESSAGE_ZMQMESSAGETEMPLATEIMPL_HPP_
00018
00020
00022
00023 namespace ZmqMessage
00024 {
00025 template <typename T>
00026 void
00027 Multipart::iterator<T>::set_cur()
00028 {
00029 if (end())
00030 {
00031 return;
00032 }
00033 if (messages_[idx_] == 0)
00034 {
00035 cur_ = T();
00036 return;
00037 }
00038
00039 get(*(messages_[idx_]), cur_, binary_mode_);
00040 }
00041
00042 template <class RoutingPolicy>
00043 template <typename T>
00044 Incoming<RoutingPolicy>&
00045 Incoming<RoutingPolicy>::operator>> (T& t) throw(NoSuchPartError)
00046 {
00047 check_has_part(cur_extract_idx_);
00048 get(*(parts_[cur_extract_idx_++]), t, binary_mode_);
00049 return *this;
00050 }
00051
00052 template <typename T>
00053 Sink&
00054 Sink::operator<< (const T& t) throw (ZmqErrorType)
00055 {
00056 MsgPtr msg(new zmq::message_t);
00057 bool binary_mode = options_ & OutOptions::BINARY_MODE;
00058 init_msg(t, *msg, binary_mode);
00059 send_owned(msg.release());
00060 return *this;
00061 }
00062
00063 template <class OccupationAccumulator>
00064 void
00065 Sink::relay_from(
00066 zmq::socket_t& relay_src, OccupationAccumulator acc)
00067 throw (ZmqErrorType)
00068 {
00069 while (has_more(relay_src))
00070 {
00071 MsgPtr cur_part(new zmq::message_t);
00072 recv_msg(relay_src, *cur_part);
00073 size_t sz = cur_part->size();
00074 acc(sz);
00075 send_owned(cur_part.release());
00076 }
00077 }
00078 }
00079
00080 #endif