00001
00009 #include <cstddef>
00010 #include <vector>
00011 #include <tr1/array>
00012 #include <string>
00013 #include <memory>
00014 #include <zmq.hpp>
00015
00016 #include <ZmqMessageFwd.hpp>
00017
00018 #include <zmqmessage/Config.hpp>
00019 #include <zmqmessage/DelObj.hpp>
00020 #include <zmqmessage/NonCopyable.hpp>
00021 #include <zmqmessage/MsgPtrVec.hpp>
00022 #include <zmqmessage/RawMessage.hpp>
00023
00024 #include <ZmqTools.hpp>
00025
00026 #ifndef ZMQMESSAGE_HPP_
00027 #define ZMQMESSAGE_HPP_
00028
00033 namespace ZmqMessage
00034 {
00040 ZMQMESSAGE_EXCEPTION_MACRO(MessageFormatError)
00041 ;
00042
00048 ZMQMESSAGE_EXCEPTION_MACRO(NoSuchPartError)
00049 ;
00050
00054 void
00055 send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00056 throw(ZmqErrorType);
00057
00058
00059
00060
00067 class SimpleRouting
00068 {
00069 protected:
00070 inline void
00071 receive_routing(zmq::socket_t& sock) {}
00072
00073 inline
00074 MsgPtrVec*
00075 get_routing()
00076 {
00077 return 0;
00078 }
00079
00080 inline
00081 void
00082 log_routing_received() const {}
00083 };
00084
00091 class XRouting
00092 {
00093 private:
00094 MsgPtrVec routing_;
00095
00096 protected:
00097 void
00098 receive_routing(zmq::socket_t& sock)
00099 throw (MessageFormatError, ZmqErrorType);
00100
00101 inline
00102 MsgPtrVec*
00103 get_routing()
00104 {
00105 return &routing_;
00106 }
00107
00108 void
00109 log_routing_received() const;
00110
00111 ~XRouting();
00112 };
00113
00117 class Multipart : private Private::NonCopyable
00118 {
00119 protected:
00120 MsgPtrVec parts_;
00121
00122 void
00123 check_has_part(size_t n) const throw(NoSuchPartError);
00124
00125 private:
00126
00127 void
00128 clear();
00129
00130 friend class Sink;
00131
00132 friend void
00133 send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00134 throw(ZmqErrorType);
00135
00136 public:
00140 template <typename T>
00141 class iterator
00142 {
00143 public:
00144 typedef T value_type;
00145 typedef T& reference;
00146 typedef T* pointer;
00147 typedef std::ptrdiff_t difference_type;
00148 typedef std::input_iterator_tag iterator_category;
00149
00150 private:
00151 explicit
00152 iterator(const MsgPtrVec& messages, bool binary_mode, bool end = false) :
00153 messages_(messages), idx_(end ? messages.size() : 0),
00154 binary_mode_(binary_mode)
00155 {
00156 set_cur();
00157 }
00158
00159 iterator(const MsgPtrVec& messages, size_t idx, bool binary_mode) :
00160 messages_(messages), idx_(idx), binary_mode_(binary_mode)
00161 {
00162 set_cur();
00163 }
00164
00165 friend class Multipart;
00166
00167 public:
00168 iterator
00169 operator++()
00170 {
00171 ++idx_;
00172 set_cur();
00173 return *this;
00174 }
00175
00176 iterator
00177 operator++(int)
00178 {
00179 iterator<T> ret_val(*this);
00180 ++(*this);
00181 return ret_val;
00182 }
00183
00184 const T
00185 operator*()
00186 {
00187 return cur_;
00188 }
00189
00190 bool
00191 operator==(const iterator<T>& rhs)
00192 {
00193 return equal(rhs);
00194 }
00195
00196 bool
00197 operator!=(const iterator<T>& rhs)
00198 {
00199 return !equal(rhs);
00200 }
00201
00202 private:
00203 const MsgPtrVec& messages_;
00204 size_t idx_;
00205 T cur_;
00206 const bool binary_mode_;
00207
00208 inline
00209 bool
00210 end() const
00211 {
00212 return idx_ >= messages_.size();
00213 }
00214
00215 bool
00216 equal(const iterator<T>& rhs) const
00217 {
00218 return end() && rhs.end();
00219 }
00220
00221 void
00222 set_cur();
00223 };
00224 public:
00225 virtual ~Multipart()
00226 {
00227 clear();
00228 }
00229
00235 Multipart*
00236 detach();
00237
00241 inline
00242 bool
00243 has_part(size_t idx)
00244 {
00245 return (parts_.size() > idx && parts_[idx] != 0);
00246 }
00247
00251 inline
00252 void
00253 reserve(size_t sz)
00254 {
00255 parts_.reserve(sz);
00256 }
00257
00268 template <typename T>
00269 inline
00270 iterator<T>
00271 begin(bool binary_mode = false) const
00272 {
00273 return iterator<T>(parts_, binary_mode);
00274 }
00275
00279 inline
00280 iterator<ZMQMESSAGE_STRING_CLASS>
00281 beginstr() const
00282 {
00283 return begin<ZMQMESSAGE_STRING_CLASS>();
00284 }
00285
00293 template <typename T>
00294 inline
00295 iterator<T>
00296 iter_at(size_t pos, bool binary_mode = false) const
00297 {
00298 return iterator<T>(parts_, pos, binary_mode);
00299 }
00300
00304 template <typename T>
00305 inline
00306 iterator<T>
00307 end() const
00308 {
00309 return iterator<T>(parts_, false, true);
00310 }
00311
00315 inline
00316 iterator<ZMQMESSAGE_STRING_CLASS>
00317 endstr() const
00318 {
00319 return end<ZMQMESSAGE_STRING_CLASS>();
00320 }
00321
00325 inline
00326 size_t
00327 size() const
00328 {
00329 return parts_.size();
00330 }
00331
00335 zmq::message_t&
00336 operator[](size_t i) throw (NoSuchPartError);
00337
00342 zmq::message_t*
00343 release(size_t i);
00344
00345 inline
00346 std::auto_ptr<zmq::message_t>
00347 release_ptr(size_t i)
00348 {
00349 return std::auto_ptr<zmq::message_t>(release(i));
00350 }
00351 };
00352
00359 Sink&
00360 NullMessage(Sink& out);
00361
00369 template<typename RoutingPolicy>
00370 Incoming<RoutingPolicy>&
00371 Skip(Incoming<RoutingPolicy>& in)
00372 {
00373 in.check_has_part(in.cur_extract_idx_);
00374 ++in.cur_extract_idx_;
00375 return in;
00376 }
00377
00383 Sink&
00384 Flush(Sink& out);
00385
00392 template<typename StreamAlike>
00393 StreamAlike&
00394 Binary(StreamAlike& out)
00395 {
00396 out.set_binary();
00397 return out;
00398 }
00399
00406 template<typename StreamAlike>
00407 StreamAlike&
00408 Text(StreamAlike& out)
00409 {
00410 out.set_text();
00411 return out;
00412 }
00413
00420 template <class RoutingPolicy>
00421 class Incoming :
00422 public Multipart, private RoutingPolicy
00423 {
00424 private:
00425
00426 zmq::socket_t& src_;
00427 bool is_terminal_;
00428 size_t cur_extract_idx_;
00429 bool binary_mode_;
00430
00431 void
00432 append_message_data(
00433 zmq::message_t& message, std::vector<char>& area) const;
00434
00439 bool
00440 receive_one() throw(ZmqErrorType);
00441
00442 template <class OutRoutingPolicy>
00443 friend class Outgoing;
00444
00445 inline
00446 MsgPtrVec*
00447 get_routing()
00448 {
00449 return RoutingPolicy::get_routing();
00450 }
00451
00452 friend
00453 Incoming<RoutingPolicy>&
00454 Skip<RoutingPolicy>(Incoming<RoutingPolicy>&);
00455
00456 public:
00457 typedef RoutingPolicy RoutingPolicyType;
00458
00459 explicit Incoming(zmq::socket_t& sock)
00460 : src_(sock), is_terminal_(false),
00461 cur_extract_idx_(0), binary_mode_(false)
00462 {
00463 }
00464
00468 inline
00469 zmq::socket_t&
00470 src()
00471 {
00472 return src_;
00473 }
00474
00479 inline
00480 bool
00481 is_terminal() const
00482 {
00483 return is_terminal_;
00484 }
00485
00490 void
00491 check_is_terminal() const throw(MessageFormatError);
00492
00502 void
00503 validate(
00504 const char* part_names[],
00505 size_t part_names_length, bool strict)
00506 throw (MessageFormatError);
00507
00517 Incoming <RoutingPolicy>&
00518 receive(
00519 size_t parts, const char* part_names[],
00520 size_t part_names_length, bool check_terminal)
00521 throw (MessageFormatError, ZmqErrorType);
00522
00526 template <size_t N>
00527 inline
00528 Incoming <RoutingPolicy>&
00529 receive(
00530 std::tr1::array<const char*, N> part_names, bool check_terminal)
00531 throw (MessageFormatError)
00532 {
00533 return receive(N, part_names.data(), check_terminal);
00534 }
00535
00540 inline
00541 Incoming <RoutingPolicy>&
00542 receive(size_t parts, bool check_terminal)
00543 throw (MessageFormatError, ZmqErrorType)
00544 {
00545 return receive(parts, 0, 0, check_terminal);
00546 }
00547
00552 inline
00553 Incoming <RoutingPolicy>&
00554 receive(
00555 size_t parts, const char* part_names[], bool check_terminal)
00556 throw (MessageFormatError, ZmqErrorType)
00557 {
00558 return receive(parts, part_names, parts, check_terminal);
00559 }
00560
00566 Incoming <RoutingPolicy>&
00567 receive_all(
00568 const size_t min_parts,
00569 const char* part_names[], size_t part_names_length)
00570 throw (MessageFormatError, ZmqErrorType);
00571
00576 inline
00577 Incoming <RoutingPolicy>&
00578 receive_all(const size_t min_parts, const char* part_names[])
00579 throw (MessageFormatError, ZmqErrorType)
00580 {
00581 return receive_all(min_parts, part_names, min_parts);
00582 }
00583
00588 inline
00589 Incoming <RoutingPolicy>&
00590 receive_all()
00591 throw (MessageFormatError, ZmqErrorType)
00592 {
00593 return receive_all(0, 0, 0);
00594 }
00595
00600 inline
00601 Incoming <RoutingPolicy>&
00602 receive_all(const size_t min_parts)
00603 throw (MessageFormatError, ZmqErrorType)
00604 {
00605 return receive_all(min_parts, 0, 0);
00606 }
00607
00613 Incoming <RoutingPolicy>&
00614 receive_up_to(size_t min_parts, const char* part_names[],
00615 size_t max_parts) throw (MessageFormatError, ZmqErrorType);
00616
00626 int
00627 fetch_tail(std::vector<char>& area, const char* delimiter = 0)
00628 throw (ZmqErrorType);
00629
00635 int
00636 drop_tail() throw(ZmqErrorType);
00637
00644 template <typename T>
00645 Incoming<RoutingPolicy>&
00646 operator>> (T& t) throw(NoSuchPartError);
00647
00652 Incoming<RoutingPolicy>&
00653 operator>> (zmq::message_t& msg) throw(NoSuchPartError);
00654
00658 Incoming<RoutingPolicy>&
00659 operator>> (Incoming<RoutingPolicy>& (*f)(Incoming<RoutingPolicy>&))
00660 {
00661 return f(*this);
00662 }
00663
00669 void
00670 set_binary()
00671 {
00672 binary_mode_ = true;
00673 }
00674
00681 void
00682 set_text()
00683 {
00684 binary_mode_ = false;
00685 }
00686 };
00687
00694 struct OutOptions
00695 {
00699 static const unsigned NONBLOCK = 0x1;
00703 static const unsigned CACHE_ON_BLOCK = 0x2;
00707 static const unsigned DROP_ON_BLOCK = 0x4;
00711 static const unsigned COPY_INCOMING = 0x8;
00712
00716 static const unsigned EMULATE_BLOCK_SENDS = 0x10;
00717
00723 static const unsigned BINARY_MODE = 0x20;
00724
00725 zmq::socket_t& sock;
00726 unsigned options;
00727
00728 inline OutOptions(zmq::socket_t& sock_p, unsigned options_p) :
00729 sock(sock_p), options(options_p)
00730 {}
00731 };
00732
00742 class Sink : private Private::NonCopyable
00743 {
00744 public:
00749 template <typename T>
00750 class iterator
00751 {
00752 public:
00753 typedef T value_type;
00754 typedef T& reference;
00755 typedef T* pointer;
00756 typedef std::ptrdiff_t difference_type;
00757 typedef std::output_iterator_tag iterator_category;
00758
00759 private:
00760 class AssignProxy
00761 {
00762 public:
00763 AssignProxy(Sink& outgoing)
00764 : outgoing_(outgoing)
00765 {
00766 }
00767
00768 void
00769 operator=(const T& output_object)
00770 {
00771 outgoing_ << output_object;
00772 }
00773
00774 private:
00775 Sink& outgoing_;
00776 };
00777
00778 public:
00779 explicit
00780 iterator(Sink& outgoing)
00781 : outgoing_(outgoing)
00782 {
00783 }
00784
00785 iterator
00786 operator++()
00787 {
00788 return *this;
00789 }
00790
00791 iterator
00792 operator++(int)
00793 {
00794 return *this;
00795 }
00796
00797 AssignProxy
00798 operator*()
00799 {
00800 return AssignProxy(outgoing_);
00801 }
00802
00803 bool
00804 operator==(const iterator<T>& rhs)
00805 {
00806 return equal(rhs);
00807 }
00808
00809 bool
00810 operator!=(const iterator<T>& rhs)
00811 {
00812 return !equal(rhs);
00813 }
00814
00815 private:
00816 Sink& outgoing_;
00817
00818 bool
00819 equal(const iterator<T>& rhs) const
00820 {
00821 return false;
00822 }
00823 };
00824
00825 private:
00826 typedef std::auto_ptr<zmq::message_t> MsgPtr;
00827
00828 zmq::socket_t& dst_;
00829
00830 unsigned options_;
00831
00837 Multipart* incoming_;
00838
00839 std::auto_ptr<Multipart> outgoing_queue_;
00840
00845 MsgPtr cached_;
00846
00847 enum State
00848 {
00849 NOTSENT,
00850 SENDING,
00851 QUEUEING,
00852 DROPPING,
00853 FLUSHED
00854 };
00855
00856 State state_;
00857
00858 protected:
00859 Sink(zmq::socket_t& dst, unsigned options, Multipart* incoming = 0) :
00860 dst_(dst), options_(options), incoming_(incoming),
00861 outgoing_queue_(0), cached_(0), state_(NOTSENT)
00862 {}
00863
00864 inline
00865 unsigned
00866 options() const
00867 {
00868 return options_;
00869 }
00870
00871 void
00872 send_one(
00873 zmq::message_t* msg, bool use_copy = false)
00874 throw(ZmqErrorType);
00875
00876 private:
00877 void
00878 send_owned(zmq::message_t* owned) throw(ZmqErrorType);
00879
00880 void
00881 do_send_one(
00882 zmq::message_t* msg, bool last) throw(ZmqErrorType);
00883
00884 bool
00885 try_send_first_cached(
00886 bool last) throw(ZmqErrorType);
00887
00888 void
00889 add_to_queue(zmq::message_t* msg);
00890
00891 public:
00892 virtual
00893 ~Sink();
00894
00899 const Multipart*
00900 incoming() const
00901 {
00902 return incoming_;
00903 }
00904
00909 inline
00910 Multipart*
00911 detach()
00912 {
00913 return outgoing_queue_.release();
00914 }
00915
00919 inline
00920 bool
00921 is_queued() const
00922 {
00923 return (outgoing_queue_.get() != 0);
00924 }
00925
00930 inline
00931 bool
00932 is_dropping() const
00933 {
00934 return (state_ == DROPPING);
00935 }
00936
00940 inline
00941 zmq::socket_t&
00942 dst()
00943 {
00944 return dst_;
00945 }
00946
00950 void
00951 flush() throw(ZmqErrorType);
00952
00953 void
00954 set_binary()
00955 {
00956 options_ |= OutOptions::BINARY_MODE;
00957 }
00958
00959 void
00960 set_text()
00961 {
00962 options_ &= ~OutOptions::BINARY_MODE;
00963 }
00964
00969 void
00970 send_incoming_messages(size_t idx_from = 0, size_t idx_to = UINT_MAX)
00971 throw(ZmqErrorType);
00972
00977 void
00978 send_incoming_messages(Multipart& multipart,
00979 bool copy, size_t idx_from = 0, size_t idx_to = UINT_MAX)
00980 throw(ZmqErrorType);
00981
00985 void
00986 relay_from(zmq::socket_t& relay_src) throw(ZmqErrorType);
00987
00993 template <class OccupationAccumulator>
00994 void
00995 relay_from(
00996 zmq::socket_t& relay_src, OccupationAccumulator acc)
00997 throw (ZmqErrorType);
00998
01003 template <typename T>
01004 Sink&
01005 operator<< (const T& t) throw (ZmqErrorType);
01006
01015 Sink&
01016 operator<< (zmq::message_t& msg) throw (ZmqErrorType);
01017
01023 inline Sink&
01024 operator<< (MsgPtr msg) throw (ZmqErrorType)
01025 {
01026 send_owned(msg.get() ? msg.release() : new zmq::message_t(0));
01027 return *this;
01028 }
01029
01033 Sink&
01034 operator<< (const RawMessage& m) throw (ZmqErrorType);
01035
01039 inline Sink&
01040 operator<< (Sink& (*f)(Sink&))
01041 {
01042 return f(*this);
01043 }
01044 };
01045
01062 template <class RoutingPolicy>
01063 class Outgoing : public Sink
01064 {
01065 private:
01066
01067 void
01068 send_routing(
01069 MsgPtrVec* routing) throw(ZmqErrorType);
01070
01071 public:
01072
01073 using Sink::iterator;
01074
01078 typedef RoutingPolicy RoutingPolicyType;
01079
01080 Outgoing(zmq::socket_t& dst, unsigned options) :
01081 Sink(dst, options)
01082 {
01083 send_routing(0);
01084 }
01085
01086 explicit Outgoing(OutOptions out_opts) :
01087 Sink(out_opts.sock, out_opts.options)
01088 {
01089 send_routing(0);
01090 }
01091
01096 template <typename InRoutingPolicy>
01097 Outgoing(zmq::socket_t& dst,
01098 Incoming<InRoutingPolicy>& incoming,
01099 unsigned options) throw(ZmqErrorType) :
01100 Sink(dst, options, &incoming)
01101 {
01102 send_routing(incoming.get_routing());
01103 }
01104
01109 template <typename InRoutingPolicy>
01110 Outgoing(OutOptions out_opts,
01111 Incoming<InRoutingPolicy>& incoming) throw(ZmqErrorType) :
01112 Sink(out_opts.sock, out_opts.options, &incoming)
01113 {
01114 send_routing(incoming.get_routing());
01115 }
01116
01121 Outgoing(zmq::socket_t& dst,
01122 Multipart& incoming,
01123 unsigned options) throw(ZmqErrorType) :
01124 Sink(dst, options, &incoming)
01125 {
01126 send_routing(0);
01127 }
01128
01133 Outgoing(OutOptions out_opts,
01134 Multipart& incoming) throw(ZmqErrorType) :
01135 Sink(out_opts.sock, out_opts.options, &incoming)
01136 {
01137 send_routing(0);
01138 }
01139 };
01140 }
01141
01142 #endif
01143
01144 #include "zmqmessage/ZmqMessageTemplateImpl.hpp"