Search code examples
c++multithreadingzeromqcommunication

ZeroMQ PubSub using inproc sockets hangs forever


I'm adapting a tcp PubSub example to using inproc with multithread. It ends up hanging forever.

My setup

  • macOS Mojave, Xcode 10.3
  • zmq 4.3.2

The source code reeproducing the issue:

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <thread>
#include "zmq.h"

void hello_pubsub_inproc() {
    void* context = zmq_ctx_new();
    void* publisher = zmq_socket(context, ZMQ_PUB);
    printf("Starting server...\n");
    int pub_conn = zmq_bind(publisher, "inproc://*:4040");

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    printf("Collecting stock information from the server.\n");
    int sub_conn = zmq_connect(subscriber, "inproc://localhost:4040");
    sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);

    std::thread t_pub = std::thread([&]{
        const char* companies[2] = {"Company1", "Company2"};
        int count = 0;
        for(;;) {
            int which_company = count % 2;
            int index = (int)strlen(companies[0]);
            char update[12];
            snprintf(update, sizeof update, "%s",
                     companies[which_company]);
            zmq_msg_t message;
            zmq_msg_init_size(&message, index);
            memcpy(zmq_msg_data(&message), update, index);
            zmq_msg_send(&message, publisher, 0);
            zmq_msg_close(&message);
            count++;
        }
    });

    std::thread t_sub = std::thread([&]{
        int i;
        for(i = 0; i < 10; i++) {
            zmq_msg_t reply;
            zmq_msg_init(&reply);
            zmq_msg_recv(&reply, subscriber, 0);
            int length = (int)zmq_msg_size(&reply);
            char* value = (char*)malloc(length);
            memcpy(value, zmq_msg_data(&reply), length);
            zmq_msg_close(&reply);
            printf("%s\n", value);
            free(value);
        }
    });

    t_pub.join();

    // Give publisher time to set up.
    sleep(1);

    t_sub.join();

    zmq_close(subscriber);
    zmq_close(publisher);
    zmq_ctx_destroy(context);
}

int main (int argc, char const *argv[]) {
    hello_pubsub_inproc();
    return 0;
}

The result

Starting server...
Collecting stock information from the server.

I've also tried adding this before joining threads to no avail:

zmq_proxy(publisher, subscriber, NULL);

The workaround: Replacing inproc with tcp fixes it instantly. But shouldn't inproc target in-process usecases?

Quick research tells me that it couldn't have been the order of bind vs. connect, since that problem is fixed in my zmq version.

The example below somehow tells me I don't have a missing shared-context issue, because it uses none:

ZeroMQ Subscribers not receiving message from Publisher over an inproc: transport class

I read from the Guide in the section Signaling Between Threads (PAIR Sockets) that

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.

What does it mean by an empty subscription?

Where am I doing wrong?


Solution

  • You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.


    Q : What does it mean by an empty subscription?

    This means to set ( configure ) a subscription, driving a Topic-list message-delivery filtering, using an empty subscription string.

    Q : Where am I doing wrong?

    Here :

    // sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);   // Wrong
       sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "",0);  //  Empty string
    

    Doubts also here, about using a proper syntax and naming rules :

    // int pub_conn = zmq_bind(publisher, "inproc://*:4040");
       int pub_conn = zmq_bind(publisher, "inproc://<aStringWithNameMax256Chars>");
    

    as inproc:// transport-class does not use any kind of external stack, but maps the AccessPoint's I/O(s) onto 1+ memory-locations ( a stack-less, I/O-thread not requiring transport-class ).

    Given this, there is nothing like "<address>:<port#>" being interpreted by such (here missing) protocol, so the string-alike text gets used as-is for identifying which Memory-location are the message-data going to go into.

    So, the "inproc://*:4040" does not get expanded, but used "literally" as a named inproc:// transport-class I/O-Memory-location identified as [*:4040] ( Next, asking a .connect()-method of .connect( "inproc://localhost:4040" ) will, and must do so, lexically miss the prepared Memory-location: ["*:4040"] as the strings do not match

    So this ought fail to .connect() - error-handling might be silent, as since the versions +4.x there is not necessary to obey the historical requirement to first .bind() ( creating a "known" named-Memory-Location for inproc:// ) before one may call a .connect() to get it cross-connected with an "already existing" named-Memory-location, so the v4.0+ will most probably not raise any error on calling and creating a different .bind( "inproc://*:4040" ) landing-zone and next asking a non-matching .connect( "inproc://localhost:4040" ) ( which does not have a "previously prepared" landing-zone in an already existing named-Memory-location.