Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

include/ZmqMessage.hpp

Go to the documentation of this file.
00001 
00009 #include <cstddef>
00010 #include <vector>
00011 #include <tr1/array>
00012 #include <string>
00013 #include <memory>
00014 #include <zmq.hpp>
00015 
00016 #include <ZmqMessageFwd.hpp>
00017 
00018 #include <zmqmessage/Config.hpp>
00019 #include <zmqmessage/DelObj.hpp>
00020 #include <zmqmessage/NonCopyable.hpp>
00021 #include <zmqmessage/MsgPtrVec.hpp>
00022 #include <zmqmessage/RawMessage.hpp>
00023 
00024 #include <ZmqTools.hpp>
00025 
00026 #ifndef ZMQMESSAGE_HPP_
00027 #define ZMQMESSAGE_HPP_
00028 
00033 namespace ZmqMessage
00034 {
00040   ZMQMESSAGE_EXCEPTION_MACRO(MessageFormatError)
00041   ;
00042 
00048   ZMQMESSAGE_EXCEPTION_MACRO(NoSuchPartError)
00049   ;
00050 
00054   void
00055   send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00056     throw(ZmqErrorType);
00057 
00058 
00059   // routing policies
00060 
00067   class SimpleRouting
00068   {
00069   protected:
00070     inline void
00071     receive_routing(zmq::socket_t& sock) {}
00072 
00073     inline
00074     MsgPtrVec*
00075     get_routing()
00076     {
00077       return 0;
00078     }
00079 
00080     inline
00081     void
00082     log_routing_received() const {}
00083   };
00084 
00091   class XRouting
00092   {
00093   private:
00094     MsgPtrVec routing_; 
00095 
00096   protected:
00097     void
00098     receive_routing(zmq::socket_t& sock)
00099       throw (MessageFormatError, ZmqErrorType);
00100 
00101     inline
00102     MsgPtrVec*
00103     get_routing()
00104     {
00105       return &routing_;
00106     }
00107 
00108     void
00109     log_routing_received() const;
00110 
00111     ~XRouting();
00112   };
00113 
00117   class Multipart : private Private::NonCopyable
00118   {
00119   protected:
00120     MsgPtrVec parts_;
00121 
00122     void
00123     check_has_part(size_t n) const throw(NoSuchPartError);
00124 
00125   private:
00126 
00127     void
00128     clear();
00129 
00130     friend class Sink;
00131 
00132     friend void
00133     send(zmq::socket_t& sock, Multipart& multipart, bool nonblock)
00134       throw(ZmqErrorType);
00135 
00136   public:
00140     template <typename T>
00141     class iterator
00142     {
00143     public:
00144       typedef T value_type;
00145       typedef T& reference;
00146       typedef T* pointer;
00147       typedef std::ptrdiff_t difference_type;
00148       typedef std::input_iterator_tag iterator_category;
00149 
00150     private:
00151       explicit
00152       iterator(const MsgPtrVec& messages, bool binary_mode, bool end = false) :
00153           messages_(messages), idx_(end ? messages.size() : 0),
00154           binary_mode_(binary_mode)
00155       {
00156         set_cur();
00157       }
00158 
00159       iterator(const MsgPtrVec& messages, size_t idx, bool binary_mode) :
00160           messages_(messages), idx_(idx), binary_mode_(binary_mode)
00161       {
00162         set_cur();
00163       }
00164 
00165       friend class Multipart;
00166 
00167     public:
00168       iterator
00169       operator++()
00170       {
00171         ++idx_;
00172         set_cur();
00173         return *this;
00174       }
00175 
00176       iterator
00177       operator++(int)
00178       {
00179         iterator<T> ret_val(*this);
00180         ++(*this);
00181         return ret_val;
00182       }
00183 
00184       const T
00185       operator*()
00186       {
00187         return cur_;
00188       }
00189 
00190       bool
00191       operator==(const iterator<T>& rhs)
00192       {
00193         return equal(rhs);
00194       }
00195 
00196       bool
00197       operator!=(const iterator<T>& rhs)
00198       {
00199         return !equal(rhs);
00200       }
00201 
00202     private:
00203       const MsgPtrVec& messages_;
00204       size_t idx_;
00205       T cur_;
00206       const bool binary_mode_;
00207 
00208       inline
00209       bool
00210       end() const
00211       {
00212         return idx_ >= messages_.size();
00213       }
00214 
00215       bool
00216       equal(const iterator<T>& rhs) const
00217       {
00218         return end() && rhs.end();
00219       }
00220 
00221       void
00222       set_cur();
00223     };
00224   public:
00225     virtual ~Multipart()
00226     {
00227       clear();
00228     }
00229 
00235     Multipart*
00236     detach();
00237 
00241     inline
00242     bool
00243     has_part(size_t idx)
00244     {
00245       return (parts_.size() > idx && parts_[idx] != 0);
00246     }
00247 
00251     inline
00252     void
00253     reserve(size_t sz)
00254     {
00255       parts_.reserve(sz);
00256     }
00257 
00268     template <typename T>
00269     inline
00270     iterator<T>
00271     begin(bool binary_mode = false) const
00272     {
00273       return iterator<T>(parts_, binary_mode);
00274     }
00275 
00279     inline
00280     iterator<ZMQMESSAGE_STRING_CLASS>
00281     beginstr() const
00282     {
00283       return begin<ZMQMESSAGE_STRING_CLASS>();
00284     }
00285 
00293     template <typename T>
00294     inline
00295     iterator<T>
00296     iter_at(size_t pos, bool binary_mode = false) const
00297     {
00298       return iterator<T>(parts_, pos, binary_mode);
00299     }
00300 
00304     template <typename T>
00305     inline
00306     iterator<T>
00307     end() const
00308     {
00309       return iterator<T>(parts_, false, true);
00310     }
00311 
00315     inline
00316     iterator<ZMQMESSAGE_STRING_CLASS>
00317     endstr() const
00318     {
00319       return end<ZMQMESSAGE_STRING_CLASS>();
00320     }
00321 
00325     inline
00326     size_t
00327     size() const
00328     {
00329       return parts_.size();
00330     }
00331 
00335     zmq::message_t&
00336     operator[](size_t i) throw (NoSuchPartError);
00337 
00342     zmq::message_t*
00343     release(size_t i);
00344 
00345     inline
00346     std::auto_ptr<zmq::message_t>
00347     release_ptr(size_t i)
00348     {
00349       return std::auto_ptr<zmq::message_t>(release(i));
00350     }
00351   };
00352 
00359   Sink&
00360   NullMessage(Sink& out);
00361 
00369   template<typename RoutingPolicy>
00370   Incoming<RoutingPolicy>&
00371   Skip(Incoming<RoutingPolicy>& in)
00372   {
00373     in.check_has_part(in.cur_extract_idx_);
00374     ++in.cur_extract_idx_;
00375     return in;
00376   }
00377 
00383   Sink&
00384   Flush(Sink& out);
00385 
00392   template<typename StreamAlike>
00393   StreamAlike&
00394   Binary(StreamAlike& out)
00395   {
00396     out.set_binary();
00397     return out;
00398   }
00399 
00406   template<typename StreamAlike>
00407   StreamAlike&
00408   Text(StreamAlike& out)
00409   {
00410     out.set_text();
00411     return out;
00412   }
00413 
00420   template <class RoutingPolicy>
00421   class Incoming :
00422     public Multipart, private RoutingPolicy
00423   {
00424   private:
00425 
00426     zmq::socket_t& src_; 
00427     bool is_terminal_; 
00428     size_t cur_extract_idx_;
00429     bool binary_mode_; 
00430 
00431     void
00432     append_message_data(
00433       zmq::message_t& message, std::vector<char>& area) const;
00434 
00439     bool
00440     receive_one() throw(ZmqErrorType);
00441 
00442     template <class OutRoutingPolicy>
00443     friend class Outgoing;
00444 
00445     inline
00446     MsgPtrVec*
00447     get_routing()
00448     {
00449       return RoutingPolicy::get_routing();
00450     }
00451 
00452     friend
00453     Incoming<RoutingPolicy>&
00454     Skip<RoutingPolicy>(Incoming<RoutingPolicy>&);
00455 
00456   public:
00457     typedef RoutingPolicy RoutingPolicyType;
00458 
00459     explicit Incoming(zmq::socket_t& sock)
00460         : src_(sock), is_terminal_(false),
00461           cur_extract_idx_(0), binary_mode_(false)
00462     {
00463     }
00464 
00468     inline
00469     zmq::socket_t&
00470     src()
00471     {
00472       return src_;
00473     }
00474 
00479     inline
00480     bool
00481     is_terminal() const
00482     {
00483       return is_terminal_;
00484     }
00485 
00490     void
00491     check_is_terminal() const throw(MessageFormatError);
00492 
00502     void
00503     validate(
00504       const char* part_names[],
00505       size_t part_names_length, bool strict)
00506       throw (MessageFormatError);
00507 
00517     Incoming <RoutingPolicy>&
00518     receive(
00519       size_t parts, const char* part_names[],
00520       size_t part_names_length, bool check_terminal)
00521       throw (MessageFormatError, ZmqErrorType);
00522 
00526     template <size_t N>
00527     inline
00528     Incoming <RoutingPolicy>&
00529     receive(
00530       std::tr1::array<const char*, N> part_names, bool check_terminal)
00531       throw (MessageFormatError)
00532     {
00533       return receive(N, part_names.data(), check_terminal);
00534     }
00535 
00540     inline
00541     Incoming <RoutingPolicy>&
00542     receive(size_t parts, bool check_terminal)
00543       throw (MessageFormatError, ZmqErrorType)
00544     {
00545       return receive(parts, 0, 0, check_terminal);
00546     }
00547 
00552     inline
00553     Incoming <RoutingPolicy>&
00554     receive(
00555       size_t parts, const char* part_names[], bool check_terminal)
00556       throw (MessageFormatError, ZmqErrorType)
00557     {
00558       return receive(parts, part_names, parts, check_terminal);
00559     }
00560 
00566     Incoming <RoutingPolicy>&
00567     receive_all(
00568       const size_t min_parts,
00569       const char* part_names[], size_t part_names_length)
00570       throw (MessageFormatError, ZmqErrorType);
00571 
00576     inline
00577     Incoming <RoutingPolicy>&
00578     receive_all(const size_t min_parts, const char* part_names[])
00579       throw (MessageFormatError, ZmqErrorType)
00580     {
00581       return receive_all(min_parts, part_names, min_parts);
00582     }
00583 
00588     inline
00589     Incoming <RoutingPolicy>&
00590     receive_all()
00591       throw (MessageFormatError, ZmqErrorType)
00592     {
00593       return receive_all(0, 0, 0);
00594     }
00595 
00600     inline
00601     Incoming <RoutingPolicy>&
00602     receive_all(const size_t min_parts)
00603       throw (MessageFormatError, ZmqErrorType)
00604     {
00605       return receive_all(min_parts, 0, 0);
00606     }
00607 
00613     Incoming <RoutingPolicy>&
00614     receive_up_to(size_t min_parts, const char* part_names[],
00615       size_t max_parts) throw (MessageFormatError, ZmqErrorType);
00616 
00626     int
00627     fetch_tail(std::vector<char>& area, const char* delimiter = 0)
00628       throw (ZmqErrorType);
00629 
00635     int
00636     drop_tail() throw(ZmqErrorType);
00637 
00644     template <typename T>
00645     Incoming<RoutingPolicy>&
00646     operator>> (T& t) throw(NoSuchPartError);
00647 
00652     Incoming<RoutingPolicy>&
00653     operator>> (zmq::message_t& msg) throw(NoSuchPartError);
00654 
00658     Incoming<RoutingPolicy>&
00659     operator>> (Incoming<RoutingPolicy>& (*f)(Incoming<RoutingPolicy>&))
00660     {
00661       return f(*this);
00662     }
00663 
00669     void
00670     set_binary()
00671     {
00672       binary_mode_ = true;
00673     }
00674 
00681     void
00682     set_text()
00683     {
00684       binary_mode_ = false;
00685     }
00686   };
00687 
00694   struct OutOptions
00695   {
00699     static const unsigned NONBLOCK = 0x1;
00703     static const unsigned CACHE_ON_BLOCK = 0x2;
00707     static const unsigned DROP_ON_BLOCK = 0x4;
00711     static const unsigned COPY_INCOMING = 0x8;
00712 
00716     static const unsigned EMULATE_BLOCK_SENDS = 0x10;
00717 
00723     static const unsigned BINARY_MODE = 0x20;
00724 
00725     zmq::socket_t& sock;
00726     unsigned options;
00727 
00728     inline OutOptions(zmq::socket_t& sock_p, unsigned options_p) :
00729       sock(sock_p), options(options_p)
00730     {}
00731   };
00732 
00742   class Sink : private Private::NonCopyable
00743   {
00744   public:
00749     template <typename T>
00750     class iterator
00751     {
00752     public:
00753       typedef T value_type;
00754       typedef T& reference;
00755       typedef T* pointer;
00756       typedef std::ptrdiff_t difference_type;
00757       typedef std::output_iterator_tag iterator_category;
00758 
00759     private:
00760       class AssignProxy
00761       {
00762       public:
00763         AssignProxy(Sink& outgoing)
00764             : outgoing_(outgoing)
00765         {
00766         }
00767 
00768         void
00769         operator=(const T& output_object)
00770         {
00771           outgoing_ << output_object;
00772         }
00773 
00774       private:
00775         Sink& outgoing_;
00776       };
00777 
00778     public:
00779       explicit
00780       iterator(Sink& outgoing)
00781           : outgoing_(outgoing)
00782       {
00783       }
00784 
00785       iterator
00786       operator++()
00787       {
00788         return *this;
00789       }
00790 
00791       iterator
00792       operator++(int)
00793       {
00794         return *this;
00795       }
00796 
00797       AssignProxy
00798       operator*()
00799       {
00800         return AssignProxy(outgoing_);
00801       }
00802 
00803       bool
00804       operator==(const iterator<T>& rhs)
00805       {
00806         return equal(rhs);
00807       }
00808 
00809       bool
00810       operator!=(const iterator<T>& rhs)
00811       {
00812         return !equal(rhs);
00813       }
00814 
00815     private:
00816       Sink& outgoing_;
00817 
00818       bool
00819       equal(const iterator<T>& rhs) const
00820       {
00821         return false;
00822       }
00823     };
00824 
00825   private:
00826     typedef std::auto_ptr<zmq::message_t> MsgPtr;
00827 
00828     zmq::socket_t& dst_;
00829 
00830     unsigned options_;
00831 
00837     Multipart* incoming_;
00838 
00839     std::auto_ptr<Multipart> outgoing_queue_;
00840 
00845     MsgPtr cached_;
00846 
00847     enum State
00848     {
00849       NOTSENT, //<! we haven't sent messages
00850       SENDING, //<! we have sent at least one message, it's OK
00851       QUEUEING,  //<! we failed to send, queueing to @c outgoing_queue_ instead
00852       DROPPING,  //<! we failed to send, dropped. Terminal state
00853       FLUSHED  //<! Message is flushed, all messages are sent or queued, no more messages accepted
00854     };
00855 
00856     State state_;
00857 
00858   protected:
00859     Sink(zmq::socket_t& dst, unsigned options, Multipart* incoming = 0) :
00860       dst_(dst), options_(options), incoming_(incoming),
00861       outgoing_queue_(0), cached_(0), state_(NOTSENT)
00862     {}
00863 
00864     inline
00865     unsigned
00866     options() const
00867     {
00868       return options_;
00869     }
00870 
00871     void
00872     send_one(
00873       zmq::message_t* msg, bool use_copy = false)
00874       throw(ZmqErrorType);
00875 
00876   private:
00877     void
00878     send_owned(zmq::message_t* owned) throw(ZmqErrorType);
00879 
00880     void
00881     do_send_one(
00882       zmq::message_t* msg, bool last) throw(ZmqErrorType);
00883 
00884     bool
00885     try_send_first_cached(
00886       bool last) throw(ZmqErrorType);
00887 
00888     void
00889     add_to_queue(zmq::message_t* msg);
00890 
00891   public:
00892     virtual
00893     ~Sink();
00894 
00899      const Multipart*
00900      incoming() const
00901      {
00902        return incoming_;
00903      }
00904 
00909      inline
00910      Multipart*
00911      detach()
00912      {
00913        return outgoing_queue_.release();
00914      }
00915 
00919      inline
00920      bool
00921      is_queued() const
00922      {
00923        return (outgoing_queue_.get() != 0);
00924      }
00925 
00930      inline
00931      bool
00932      is_dropping() const
00933      {
00934        return (state_ == DROPPING);
00935      }
00936 
00940      inline
00941      zmq::socket_t&
00942      dst()
00943      {
00944        return dst_;
00945      }
00946 
00950      void
00951      flush() throw(ZmqErrorType);
00952 
00953      void
00954      set_binary()
00955      {
00956        options_ |= OutOptions::BINARY_MODE;
00957      }
00958 
00959      void
00960      set_text()
00961      {
00962        options_ &= ~OutOptions::BINARY_MODE;
00963      }
00964 
00969      void
00970      send_incoming_messages(size_t idx_from = 0, size_t idx_to = UINT_MAX)
00971        throw(ZmqErrorType);
00972 
00977      void
00978      send_incoming_messages(Multipart& multipart,
00979        bool copy, size_t idx_from = 0, size_t idx_to = UINT_MAX)
00980        throw(ZmqErrorType);
00981 
00985      void
00986      relay_from(zmq::socket_t& relay_src) throw(ZmqErrorType);
00987 
00993      template <class OccupationAccumulator>
00994      void
00995      relay_from(
00996        zmq::socket_t& relay_src, OccupationAccumulator acc)
00997        throw (ZmqErrorType);
00998 
01003      template <typename T>
01004      Sink&
01005      operator<< (const T& t) throw (ZmqErrorType);
01006 
01015      Sink&
01016      operator<< (zmq::message_t& msg) throw (ZmqErrorType);
01017 
01023      inline Sink&
01024      operator<< (MsgPtr msg) throw (ZmqErrorType)
01025      {
01026        send_owned(msg.get() ? msg.release() : new zmq::message_t(0));
01027        return *this;
01028      }
01029 
01033      Sink&
01034      operator<< (const RawMessage& m) throw (ZmqErrorType);
01035 
01039      inline Sink&
01040      operator<< (Sink& (*f)(Sink&))
01041      {
01042        return f(*this);
01043      }
01044   };
01045 
01062   template <class RoutingPolicy>
01063   class Outgoing : public Sink
01064   {
01065   private:
01066 
01067     void
01068     send_routing(
01069       MsgPtrVec* routing) throw(ZmqErrorType);
01070 
01071   public:
01072 
01073     using Sink::iterator;
01074 
01078     typedef RoutingPolicy RoutingPolicyType;
01079 
01080     Outgoing(zmq::socket_t& dst, unsigned options) :
01081       Sink(dst, options)
01082     {
01083       send_routing(0);
01084     }
01085 
01086     explicit Outgoing(OutOptions out_opts) :
01087       Sink(out_opts.sock, out_opts.options)
01088     {
01089       send_routing(0);
01090     }
01091 
01096     template <typename InRoutingPolicy>
01097     Outgoing(zmq::socket_t& dst,
01098       Incoming<InRoutingPolicy>& incoming,
01099       unsigned options) throw(ZmqErrorType) :
01100       Sink(dst, options, &incoming)
01101     {
01102       send_routing(incoming.get_routing());
01103     }
01104 
01109     template <typename InRoutingPolicy>
01110     Outgoing(OutOptions out_opts,
01111       Incoming<InRoutingPolicy>& incoming) throw(ZmqErrorType) :
01112       Sink(out_opts.sock, out_opts.options, &incoming)
01113     {
01114       send_routing(incoming.get_routing());
01115     }
01116 
01121     Outgoing(zmq::socket_t& dst,
01122       Multipart& incoming,
01123       unsigned options) throw(ZmqErrorType) :
01124       Sink(dst, options, &incoming)
01125     {
01126       send_routing(0);
01127     }
01128 
01133     Outgoing(OutOptions out_opts,
01134       Multipart& incoming) throw(ZmqErrorType) :
01135       Sink(out_opts.sock, out_opts.options, &incoming)
01136     {
01137       send_routing(0);
01138     }
01139   };
01140 }
01141 
01142 #endif /* ZMQMESSAGE_HPP_ */
01143 
01144 #include "zmqmessage/ZmqMessageTemplateImpl.hpp"

ZmqMessage 0.1 — open source software, support@zmqmessage.org