Overview Tutorial API reference Examples Build Download | ZmqMessage 0.1 - 21 Oct 2011 |
To integrate the library into your application you can (and encouraged to) define a few macro constants before including library headers (ZmqMessage.hpp and ZmqTools.hpp) anywhere in your application. ZmqMessageFwd.hpp may be included wherever (i.e. before the definitions). Though none of these definitions are mandatory.
These constants are:
class MyString { MyString(const char*, size_t); const char* data() const; size_t length() const; };
#define ZMQMESSAGE_LOG_STREAM ZMQMESSAGE_LOG_STREAM_NONE
#define ZMQMESSAGE_LOG_TERM ""
zmq::error_t
will be wrapped with exception generated according to ZMQMESSAGE_EXCEPTION_MACRO and named ZmqException
. Otherwise, it's thrown upwards as is. For non header-only builds must be the same in shared library and client code, just like ZMQMESSAGE_EXCEPTION_MACRO
In function `__static_initialization_and_destruction_0': /home/andrey_skryabin/projects/zmqmessage/include/zmqmessage/TypeCheck.hpp:32: undefined reference to `ZmqMessage::Private::TypeCheck<ZmqMessage::MessageFormatError, ZmqMessage::NoSuchPartError, zmq::error_t>::value' collect2: ld returned 1 exit status
//@file foo.cpp //make definitions available for linking #include "ZmqMessageImpl.hpp" ...
Such approach is better than header-only, cause it reduces compilation time.
To receive multipart message, you create instance of ZmqMessage::Incoming. For XREQ and XREP socket types use ZmqMessage::XRouting as template parameter, for other socket types use ZmqMessage::SimpleRouting.
#include "ZmqMessage.hpp" zmq::context_t ctx(1); zmq::socket sock(ctx, ZMQ_PULL); //will use SimpleRouting sock.connect("inproc://some-endpoint"); ZmqMessage::Incoming<ZmqMessage::SimpleRouting> incoming(sock); //say, we know what message parts we receive. Here are their names: const char* req_parts[] = {"id", "name", "blob"}; //true because it's a terminal message, no more parts allowed at end incoming.receive(3, req_parts, true); //Get 2nd part explicitly (assume ZMQMESSAGE_STRING_CLASS is std::string): std::string name = ZmqMessage::get_string(incoming[1]); //or more verbose: std::string name_cpy = ZmqMessage::get<std::string>(incoming[1]); //if we also have some MyString class: MyString my_id = ZmqMessage::get<MyString>(incoming[0]); //or we can extract data as variables or plain zmq messages. zmq::message_t blob; incoming >> my_id >> name >> blob; //or we can iterate on message parts and use standard algorithms: std::ostream_iterator<MyString> out_it(std::cout, ", "); std::copy( incoming.begin<MyString>(), incoming.end<MyString>(), out_it);
Of course, passing message part names is not necessary (see receive functions).
There are cases when implemented protocol implies a fixed number of message parts at the beginning of multipart message. And the number of subsequent messages is either undefined or estimated based on contents of first parts (i.e. first part may contain command name, and other parts may contain data dependent on command). So you first may call receive function with false
as last parameter (not checking if message is terminal), do something with first parts, and then call receive or receive_all again to fetch the rest.
const char* req_parts[] = {"command"}; incoming.receive(1, req_parts, false); std::string cmd = ZmqMessage::get_string(incoming[0]); if (cmd == "SET_PARAM") { const char* tail_parts[] = {"parameter 1 value (text)", "parameter 2 value (binary)"}; incoming.receive(2, tail_parts, true); //message with parameter 1 contains text data converted into unsigned 32 bit integer (ex. "678" -> 678) uint32_t param1 = ZmqMessage::get<uint32_t>(incoming[1]); //message with parameter 2 contains binary data (unsigned 32 bit integer) uint32_t param2 = ZmqMessage::get_bin<uint32_t>(incoming[2]); //or this way (second parameter defines binary/text mode) uint32_t param2_copy = ZmqMessage::get<uint32_t>(incoming[2], true); //... } else { //otherwise we receive all remaining message parts incoming.receive_all(); if (incoming.size() > 1) { std::string first; incoming >> ZmqMessage::Skip //command is already analyzed >> first; // same as first = ZmqMessage::get<std::string>(incoming[1]); //... } }
To send a multipart message, you create instance of ZmqMessage::Outgoing. For sending to XREQ and XREP socket types use ZmqMessage::XRouting as template parameter, for other socket types use ZmqMessage::SimpleRouting.
#include "ZmqMessage.hpp" zmq::context_t ctx(1); zmq::socket sock(ctx, ZMQ_XREQ); //will use XRouting sock.connect("inproc://some-endpoint"); //create Outgoing specifying sending options: use nonblocking send //and drop messages if operation would block ZmqMessage::Outgoing<ZmqMessage::XRouting> outgoing( sock, ZmqMessage::OutOptions::NONBLOCK | ZmqMessage::OutOptions::DROP_ON_BLOCK); //suppose we have some MyString class: MyString id("112233"); outgoing << id << "SET_VARIABLES"; //Number will be converted to string (written to stream), cause Outgoing is in Text mode. outgoing << 567099; char buffer[128]; ::memset (buffer, 'z', 128); //fill buffer outgoing << ZmqMessage::RawMessage(buffer, 128); //send message with binary number and flush it; int num = 9988; outgoing << ZmqMessage::Binary << num << ZmqMessage::Flush;
Note, that Outgoing template class is derived from ZmqMessage::Sink class, and all the functionality for inserting and sending messages is encapsulated there. It's done that way because Outgoing depends on RoutingPolicy template parameter only on construction and implicit sending of routing message parts, so after creation, Outgoing may be referenced as non-template Sink (passed to functions, etc.):
void send_parts(ZmqMessage::Sink& sink) { sink << "some data" << 5678 << ZmqMessage::Flush; } int main() { ZmqMessage::Outgoing<ZmqMessage::XRouting> outgoing(...); send_parts(outgoing); return 0; }
Flushing Outgoing message sends final (terminal) message part (which was inserted before flushing), and no more insertions allowed after it. If you do not flush Outgoing message manually, it will flush in destructor.