00001
00013 #ifndef ZMQMESSAGE_ZMQTOOLSFULLIMPL_HPP_
00014 #define ZMQMESSAGE_ZMQTOOLSFULLIMPL_HPP_
00015
00016 #include <string>
00017
00018 namespace
00019 {
00020 void
00021 zmqmessage_free(void *data, void *hint)
00022 {
00023 ::free(data);
00024 }
00025 }
00026
00027 namespace ZmqMessage
00028 {
00029 time_t
00030 get_time(zmq::message_t& message)
00031 {
00032 errno = 0;
00033 std::string str;
00034 get(message, str);
00035 time_t tm = strtol(str.data(), 0, 10);
00036 if (errno)
00037 {
00038 return time_t();
00039 }
00040 return tm;
00041 }
00042
00043 void
00044 init_msg(const void* t, size_t sz, zmq::message_t& msg)
00045 {
00046 try
00047 {
00048 void *data = ::malloc(sz);
00049 if (!data)
00050 {
00051 throw zmq::error_t();
00052 }
00053 ::memcpy(data, t, sz);
00054 msg.rebuild(data, sz, &zmqmessage_free, 0);
00055 }
00056 catch (const zmq::error_t& e)
00057 {
00058 throw_zmq_exception(e);
00059 }
00060 }
00061
00062 bool
00063 has_more(zmq::socket_t& sock)
00064 {
00065 int64_t more = 0;
00066 size_t more_size = sizeof(more);
00067 sock.getsockopt(ZMQ_RCVMORE, &more, &more_size);
00068 return (more != 0);
00069 }
00070
00071 int
00072 relay_raw(zmq::socket_t& src, zmq::socket_t& dst, bool check_first_part)
00073 {
00074 int relayed = 0;
00075 for (
00076 bool more = check_first_part ? has_more(src) : true;
00077 more; ++relayed)
00078 {
00079 zmq::message_t cur_part;
00080 src.recv(&cur_part);
00081 more = has_more(src);
00082 int flag = more ? ZMQ_SNDMORE : 0;
00083 send_msg(dst, cur_part, flag);
00084 }
00085 return relayed;
00086 }
00087 }
00088
00089 #endif