Search code examples
c++zeromq

ZeroMQ: how to use multiple Publishers and a single Client, using C < C11


I am new to ZeroMQ.
I have multiple publishers and one client. Seeking suggestions to implement it in a best way.
Currently its making use of a reply - request pattern for a single client and a server; this has to be extended to multiple publishers and a single subscriber.

This application is going to run on a QNX-system that does not support C11, so zmq::multipart_t is not helping.

void TransportLayer::Init()
{
    socket.bind( "tcp://*:5555" );
}

void TransportLayer::Receive()
{
    while ( true ) {
        zmq::message_t request;
        string protoBuf;
        socket.recv( &request );

        uint16_t id = *( (uint16_t*)request.data() );
        protoBuf = std::string( static_cast<char*>( request.data()
                                                  + sizeof( uint16_t )
                                                    ),
                                request.size() - sizeof( uint16_t )
                                );
        InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf );
    }
    Send();
    usleep( 1 );
}

void TransportLayer::Send()
{
    zmq::message_t reply( 1 );
    memcpy( reply.data(), "#", 1 );

    socket.send( reply );
}

This is the code that I had written, this was initially designed to listen to only one client, now I have to extend it to listen to multiple clients.

I tried using zmq::multipart_t but this requires C11 support but the QNX-version we are using does not support C11.


I tried implementing the proposed solution.
I created 2 publishers connecting to same static location.

Observation :

I )
Execution Order :
1. Started Subscriber
2. Started Publisher1 ( it published only one data value )

Subscriber missed to receive this data.

II )
modified Publisher1 to send the same data in a while loop
Execution Order :
1. Started Subscriber
2. Started Publisher1
3. Started Publsiher2.

Now I see that the Subscriber is receiving the data from both publishers.

This gives me an indication that there is a possibility for data loss.

How do I ensure there is absolutely no data loss?


Here is my source code :

Publisher 2 :

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData() {

while ( std::getline(file, line_str) ) {

        std::stringstream ss(line_str);
        std::string direction;

        double tdiff;
        int    i, _1939, pgn, priority, source, length, data[8];
        char   J, p, _0, dash, d;

        ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
           >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
           >> data[3] >> data[4] >> data[5] >> data[6] >> data[7];

        timestamp += tdiff;

        while (            gcl_get_time_ms() - start_time <
                uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); }

        if (arguments.verbose) {
            std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
                << pgn << " " << p << " " << priority << " " << _0 << " " << source
                << " " << dash << " " << direction << " " << d << " " << length
                << " " << data[0] << " " << data[1] << " " << data[2] << " "
                << data[3] << " " << data[4] << " " << data[5] << " " << data[6]
                << " " << data[7] << std::endl;
        }

        uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);

        protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */
        protoTable.add_columnvalues(intToString(pgn));             /* PGN       */
        protoTable.add_columnvalues(intToString(priority));        /* Priority  */
        protoTable.add_columnvalues(intToString(source));          /* Source    */
        protoTable.add_columnvalues(direction);                    /* Direction */
        protoTable.add_columnvalues(intToString(length));          /* Length    */
        protoTable.add_columnvalues(intToString(data[0]));         /* data1     */
        protoTable.add_columnvalues(intToString(data[1]));         /* data2     */
        protoTable.add_columnvalues(intToString(data[2]));         /* data3     */
        protoTable.add_columnvalues(intToString(data[3]));         /* data4     */
        protoTable.add_columnvalues(intToString(data[4]));         /* data5     */
        protoTable.add_columnvalues(intToString(data[5]));         /* data6     */
        protoTable.add_columnvalues(intToString(data[6]));         /* data7     */
        protoTable.add_columnvalues(intToString(data[7]));         /* data8     */

    zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
        *((uint16_t*)create_values.data()) = TABLEMSG_ID;  // ID
        protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());

        socket.send(create_values);

        protoTable.clear_columnvalues();
        usleep(1);
    }

}

Publisher 1 :

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData()
{
   cout << "In SendData" << endl;

   while(1) { 
       canlogreq canLogObj = canlogreq::default_instance();
                 canLogObj.set_fromhours(11);
                 canLogObj.set_fromminutes(7);
                 canLogObj.set_fromseconds(2);
                 canLogObj.set_fromday(16);
                 canLogObj.set_frommonth(5);
                 canLogObj.set_fromyear(2020);
                 canLogObj.set_tohours(12);
                 canLogObj.set_tominutes(7);
                 canLogObj.set_toseconds(4);
                 canLogObj.set_today(17);
                 canLogObj.set_tomonth(5);
                 canLogObj.set_toyear(2020);

       zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));

       *((uint16_t*)logsnippetmsg.data()) = 20;

       canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());

       socket.send(logsnippetmsg);

       usleep(1);

       canLogObj.clear_fromhours();
       canLogObj.clear_fromminutes();
       canLogObj.clear_fromseconds();
       canLogObj.clear_fromday();
       canLogObj.clear_frommonth();
       canLogObj.clear_fromyear();
       canLogObj.clear_tohours();
       canLogObj.clear_tominutes();
       canLogObj.clear_toseconds();
       canLogObj.clear_today();
       canLogObj.clear_tomonth();
       canLogObj.clear_toyear();
   }

}

Subscriber :

TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
    socket.bind("tcp://*:5555"); 
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
    cout << "TransportLayer::Receive " << " I am in server " << endl;

    static int count = 1;
    // Producer thread.
    while ( true ){

        zmq::message_t request;
        string protoBuf;

        socket.recv(&request);

        uint16_t id = *((uint16_t*)request.data());

        cout << "TransportLayer : " << "request.data:  " << request.data() << endl;
        cout << "TransportLayer : count " << count << endl; count = count + 1;
        cout << "TransportLayer : request.data.size " << request.size() << endl;

        protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t));

        cout << "ProtoBuf : " << protoBuf << endl;

        InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();

        switch(id) {
            case TABLEMSG_ID:   cout << "Canlyser" << endl;
                                interfaceLayObj->ParseProtoBufTable(protoBuf);
                                break; 
            case LOGSNIPPET_ID: cout << "LogSnip" << endl;
                                interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
                                interfaceLayObj->logsnippetSignal(); // publish the signal
                                break;
            default:            break;
        }

        usleep(1);

    }

}

Solution

  • Q : "how to use multiple Publishers and a single Client, using C < C11?"

    So, the QNX-version was not explicitly stated, so let's work in general.

    As noted in ZeroMQ Principles in less than Five Seconds, the single Client ( being of a SUB-Archetype ) may zmq_connect( ? ), however at a cost of managing some, unknown for me, way how all the other, current plus any future PUB-s were let to zmq_bind(), after which to let somehow let the SUB learn where to zmq_connect( ? ), so that to get some news from the newly bound PUB-peer.

    So it would be a way smarter to make the single SUB-agent to perform a zmq_bind() and let any of the current or future PUB-s perform zmq_connect() as they come, directed to the single, static, known SUB's location ( this does not say, they cannot use any of the available transport-classes - one inproc://, another one tcp://, some ipc://, if QNX permits & system architecture requires to do so ( and, obviously, supposing the SUB-agent has exposed a properly configured AccessNode for receiving such connections ).

    Next, your SUB-Client has to configure its subscription filtering topic-list: be it an order to "Do Receive EVERYTHING!" :

    ...
    retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 );
    assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " );
    ...
    

    Given this works, your next duty is to make the setup robust enough ( an explicit ZMQ_LINGER setting to 0, access-policies, security, scaled-resources, L2/L3-network protective measures, etc ).

    And you are done to harness the ZeroMQ just fit right to your QNX-system design needs.