Overview Tutorial API reference Examples Build Download ZmqMessage 0.1 - 21 Oct 2011

Tutorial


Configuring library

To integrate the library into your application you can (and encouraged to) define a few macro constants before including library headers (ZmqMessage.hpp and ZmqTools.hpp) anywhere in your application. ZmqMessageFwd.hpp may be included wherever (i.e. before the definitions). Though none of these definitions are mandatory.

These constants are:

Linking options

  1. You may build ZmqMessage as shared library (see build instructions) and link against it. But if you need extensive configuration, the following problem may arise: library is built separately, with definite and basically default configuration (ZMQMESSAGE_LOG_STREAM, ZMQMESSAGE_EXCEPTION_MACRO, ZMQMESSAGE_WRAP_ZMQ_ERROR), So if you need to override them in your application, you really need to rebuild shared library with appropriate definitions (the same as you define in your application), otherwise you can get the link error as following:
    In function `__static_initialization_and_destruction_0':
    /home/andrey_skryabin/projects/zmqmessage/include/zmqmessage/TypeCheck.hpp:32: undefined reference to `ZmqMessage::Private::TypeCheck<ZmqMessage::MessageFormatError, ZmqMessage::NoSuchPartError, zmq::error_t>::value'
    collect2: ld returned 1 exit status
    
    That's because we check that generated types are the same. Link with shared library if you don't need extensive configuration or if it's not too cumbersome for you to rebuild shared library with the same configuration.
  2. Do not link against shared library. In this case you MUST include implementation code in ONE of your .cpp files in order to assemble you binary:
    //@file foo.cpp
    
    //make definitions available for linking
    #include "ZmqMessageImpl.hpp"
    
    ...
    
    Do not include "ZmqMessageImpl.hpp" on more than one .cpp file, or you will get multiple definitions and thus link error.

    Such approach is better than header-only, cause it reduces compilation time.

Receiving messages

To receive multipart message, you create instance of ZmqMessage::Incoming. For XREQ and XREP socket types use ZmqMessage::XRouting as template parameter, for other socket types use ZmqMessage::SimpleRouting.

#include "ZmqMessage.hpp"

zmq::context_t ctx(1);
zmq::socket sock(ctx, ZMQ_PULL); //will use SimpleRouting
sock.connect("inproc://some-endpoint");

ZmqMessage::Incoming<ZmqMessage::SimpleRouting> incoming(sock);

//say, we know what message parts we receive. Here are their names:
const char* req_parts[] = {"id", "name", "blob"};

//true because it's a terminal message, no more parts allowed at end
incoming.receive(3, req_parts, true);

//Get 2nd part explicitly (assume ZMQMESSAGE_STRING_CLASS is std::string):
std::string name = ZmqMessage::get_string(incoming[1]);
//or more verbose:
std::string name_cpy = ZmqMessage::get<std::string>(incoming[1]);

//if we also have some MyString class:
MyString my_id = ZmqMessage::get<MyString>(incoming[0]);

//or we can extract data as variables or plain zmq messages.
zmq::message_t blob;
incoming >> my_id >> name >> blob;

//or we can iterate on message parts and use standard algorithms:
std::ostream_iterator<MyString> out_it(std::cout, ", ");
std::copy(
  incoming.begin<MyString>(), incoming.end<MyString>(), out_it);

Of course, passing message part names is not necessary (see receive functions).

There are cases when implemented protocol implies a fixed number of message parts at the beginning of multipart message. And the number of subsequent messages is either undefined or estimated based on contents of first parts (i.e. first part may contain command name, and other parts may contain data dependent on command). So you first may call receive function with false as last parameter (not checking if message is terminal), do something with first parts, and then call receive or receive_all again to fetch the rest.

const char* req_parts[] = {"command"};
incoming.receive(1, req_parts, false);

std::string cmd = ZmqMessage::get_string(incoming[0]);

if (cmd == "SET_PARAM")
{
  const char* tail_parts[] = {"parameter 1 value (text)", "parameter 2 value (binary)"};
  incoming.receive(2, tail_parts, true);

  //message with parameter 1 contains text data converted into unsigned 32 bit integer (ex. "678" -> 678)
  uint32_t param1 = ZmqMessage::get<uint32_t>(incoming[1]);

  //message with parameter 2 contains binary data (unsigned 32 bit integer)
  uint32_t param2 = ZmqMessage::get_bin<uint32_t>(incoming[2]);
  //or this way (second parameter defines binary/text mode)
  uint32_t param2_copy = ZmqMessage::get<uint32_t>(incoming[2], true);

  //...
}
else
{
  //otherwise we receive all remaining message parts
  incoming.receive_all();

  if (incoming.size() > 1)
  {
    std::string first;
    incoming >> ZmqMessage::Skip //command is already analyzed
      >> first; // same as first = ZmqMessage::get<std::string>(incoming[1]);
    //...
  }
}

Sending messages

To send a multipart message, you create instance of ZmqMessage::Outgoing. For sending to XREQ and XREP socket types use ZmqMessage::XRouting as template parameter, for other socket types use ZmqMessage::SimpleRouting.

#include "ZmqMessage.hpp"

zmq::context_t ctx(1);
zmq::socket sock(ctx, ZMQ_XREQ); //will use XRouting
sock.connect("inproc://some-endpoint");

//create Outgoing specifying sending options: use nonblocking send
//and drop messages if operation would block
ZmqMessage::Outgoing<ZmqMessage::XRouting> outgoing(
  sock, ZmqMessage::OutOptions::NONBLOCK | ZmqMessage::OutOptions::DROP_ON_BLOCK);

//suppose we have some MyString class:
MyString id("112233");

outgoing << id << "SET_VARIABLES";

//Number will be converted to string (written to stream), cause Outgoing is in Text mode.
outgoing << 567099;

char buffer[128];
::memset (buffer, 'z', 128); //fill buffer

outgoing << ZmqMessage::RawMessage(buffer, 128);

//send message with binary number and flush it;
int num = 9988;
outgoing << ZmqMessage::Binary << num << ZmqMessage::Flush;

Note, that Outgoing template class is derived from ZmqMessage::Sink class, and all the functionality for inserting and sending messages is encapsulated there. It's done that way because Outgoing depends on RoutingPolicy template parameter only on construction and implicit sending of routing message parts, so after creation, Outgoing may be referenced as non-template Sink (passed to functions, etc.):

void send_parts(ZmqMessage::Sink& sink)
{
  sink << "some data" << 5678 << ZmqMessage::Flush;
}

int main()
{
  ZmqMessage::Outgoing<ZmqMessage::XRouting> outgoing(...);

  send_parts(outgoing);

  return 0;
}

Flushing Outgoing message sends final (terminal) message part (which was inserted before flushing), and no more insertions allowed after it. If you do not flush Outgoing message manually, it will flush in destructor.


ZmqMessage 0.1 — open source software, support@zmqmessage.org