Search code examples
tcpzeromq

How could you develop a simple DEALER/ROUTER message flow, using ZeroMQ?


I'm fairly new to TCP messaging (and programming in general) and I am trying to develop a simple ROUTER/DEALER message pair with ZeroMQ but am struggling in getting the router to receive a message from the dealer and send one back.

I can do a simple REQ/REP pattern with no problem, where I can send one message from my machine to my VM.

However, when trying to develop a ROUTER/DEALER pair, I can't seem to get the ROUTER-instance to receive the message (ROUTER on VM, DEALER on main box). I have had some success where I could spam 50 messages in a while(){...} loop, but can't send a single message and have the ROUTER send one back.

So from what I have read, a TCP message in a ROUTER/DEALER pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER to register an incoming message.

So I just want to send the message "ROUTER_TEST" to my server, and for my server to respond with "RECEIVED".

DEALER

#include <cstdlib>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>

#include "zmq.h"

const char connection[] = "tcp://10.0.10.76:5555";
int main(void)
{
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch);
    printf("Connecting to: %s\n", connection);

    void *context = zmq_ctx_new();

    void *requester = zmq_socket(context, ZMQ_DEALER);

    int zc = zmq_connect(requester, connection); 
    std::cout << "zmq_connect = " << zc << std::endl;

    int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;

    char messageSend[] = "ROUTER_TEST";

    int request_nbr;
    int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE );
    int ii = 0;
    if(n==0) {
        std::cout << "n = " << n << std::endl;
    while (ii < 50)
    {
        n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT);

        ii++;
    }
    }

    return 0;
}

ROUTER

// SERVER
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>

#include "zmq.h"

int main(void)
{
    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_ROUTER);
    printf("THIS IS WORKING - ROUTER\n");
    int rc = zmq_bind(responder, "tcp://*:5555");
    assert(rc == 0);

    zmq_pollitem_t pollItems[] = {
        {responder, 0, ZMQ_POLLIN, -1}};

    int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;
    uint8_t buffer[15];
    while (1)
    {
        int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT);
        if (rc == 0)
        {
            std::cout << "zmq_recv = " << rc << std::endl;
            zmq_send(responder, "RECIEVED", 9,0);
        }

        zmq_poll(pollItems, sizeof(pollItems), -1);

    }
    return 0;
}

Solution

  • Your code calls, on the DEALER-side a series of:

    void *requester = zmq_socket( context,
                                  ZMQ_DEALER           // <--   .STO <ZMQ_DEALER>, *requester
                                  );
    ...
    int n   = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~   .STO 0, n
                        NULL,     //                             NULL,sizeof(NULL)== 0
                        0,       //                              explicitly declared 0
                        ZMQ_DONTWAIT                   //           _DONTWAIT flag
                      | ZMQ_SNDMORE                    //---- 1x ZMQ_SNDMORE  flag== 
                        );                             //        1.Frame in 1st MSG
    int ii  = 0;                                       //        MSG-under-CONSTRUCTION
    if ( n == 0 )                                      //     ...until complete, not yet sent
    {    
         std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl;
         
         while ( ii < 50 )
         {       ii++;
                 n = zmq_send( requester,   //---------//---- 1x ZMQ_SNDMORE following
                               messageSend, //         //        2.Frame in 1st MSG
                               31,          //         //        MSG-under-CONSTRUCTION, but
                               ZMQ_DONTWAIT //         //        NOW complete & will get sent
                               );           //---------//----49x monoFrame MSGs follow
         }
    }
    ...
    

    What happens on the opposite side, the ROUTER-side code ?

    ...
    while (1)
    {       
            int  rc  = zmq_recv( responder, //----------------- 1st .recv()
                                 buffer,
                                 5,
                                 ZMQ_DONTWAIT
                                 );
            if ( rc == 0 )
            {
                std::cout << "zmq_recv = " << rc << std::endl;
                zmq_send( responder,  // _____________________ void  *socket
                          "RECEIVED", // _____________________ void  *buffer
                          9,  // _____________________________ size_t len
                          0   // _____________________________ int    flags
                          );
            }
            zmq_poll( pollItems,
                      sizeof( pollItems ),
                      -1 // __________________________________ block ( forever )
                      );//                                     till  ( if ever ) ...?
    }
    

    Here, most probably, the rc == 0 but once, if not missed, but never more

    Kindly notice, that your code does not detect in any way if a .recv()-call is also being flagged by a ZMQ_RECVMORE - signaling a need to first also .recv()-all-the-rest multi-Frame parts of the first message, before becoming able to .send()-any-answer...

    An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_recv() to determine if there are further parts to receive.

    Next, the buffer and messageSend message-"payloads" are a kind of fragile entities and ought be re-composed ( for details best read again all details about how to carefully initialise, work with and safely-touch any zmq_msg_t object(s) ), as after a successful .send()/.recv() the low level API ( since 2.11.x+ ) considers them disposed-off, not re-useable. Also note, that messageSend is not (as imperatively put into the code) a 31-char[]-long, was it? Was there any particular intention to do this?

    The zmq_send() function shall return number of bytes in the message if successful. Otherwise it shall return -1 and set errno to one of the values defined below. { EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }

    Not testing error-state means knowing nothing about the actual state ( see EFSM and other potential trouble explainers ) of REQ/REP and DEALER/ROUTER (extended) .send()/.recv()/.send()/.recv()/... mandatory dFSA's order of these steps


    "So from what I have read, a TCP message in a ROUTER/DEALER pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER to register an incoming message."

    This seems to be a misunderstood part. The app-side is free to compose any number of monoframe or multi-frame messages, yet the "trick" of a ROUTER prepended identity-frame is performed without users assistance ( message-labelling is performed automatically, before any ( now, principally all ) multi-frame(d) messages get delivered to the app-side ( using the receiver's side .recv()-method ). Due handling of multi-frame messages was noted above.