Search code examples
cwindowszeromq

zeromq pub/sub example in c (libzmq) with working filters (on windows)?


Looking at this topic: zeromq pub/sub example in c (libzmq)

The code there works in windows (Visual Studio 2019 compiler and vcpkg installed zeromq - zeromq:x64-windows 4.3.4#6)

#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"

int main(int argc, char const* argv[])
{
    DWORD thread_id = GetCurrentThreadId();
    // create a context that will be shared by all threads
    void* context = zmq_ctx_new();
    const char* endpoint = "tcp://127.0.0.1:4040";

    std::thread publisher_thread = std::thread([&]
    {
        void* publisher = zmq_socket(context, ZMQ_PUB);
        int rc = zmq_bind(publisher, endpoint);
        //assert(rc == 0);

        while (1)
        {
            rc = zmq_send(publisher, "Hello World!", 12, 0);
            //assert(rc == 12);
        }

        zmq_close(publisher);
    });

    // quick hack to make sure publisher thread is up and running before subscribers connect.
    Sleep(1000);

    std::thread subscriber_thread_all = std::thread([&]
    {
        void* subscriber = zmq_socket(context, ZMQ_SUB);
        int rc = zmq_connect(subscriber, endpoint);
        //assert(rc == 0);
        rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
        //assert(rc == 0);

        char message[12];

        while (1)
        {
            rc = zmq_recv(subscriber, message, 12, 0);
            //assert(rc != -1);
            printf("%s\n", message);
        }
        zmq_close(subscriber);
    });

    subscriber_thread_all.join();
    publisher_thread.join();

    zmq_ctx_destroy(context);
    return 0;
}

This works and produces expected results with the sub thread receiving all the messages from the publisher.

However, If I add any subscription other than all, no filtering happens. For example, making the slight modification above to send alternating messages in the publisher and subscribe to those starting with "H":

#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"

int main(int argc, char const* argv[])
{
    DWORD thread_id = GetCurrentThreadId();
    // create a context that will be shared by all threads
    void* context = zmq_ctx_new();
    const char* endpoint = "tcp://127.0.0.1:4040";

    std::thread publisher_thread = std::thread([&]
    {

        void* publisher = zmq_socket(context, ZMQ_PUB);
        int rc = zmq_bind(publisher, endpoint);
        //assert(rc == 0);

        int message_counter(0);
        while (1)
        {
            if (message_counter % 2 == 0)
            {
                rc = zmq_send(publisher, "Hello World!", 12, 0);
            }
            else
            {
                rc = zmq_send(publisher, "Goodbye World!", 12, 0);
            }
            ++message_counter;
            //assert(rc == 12);
        }

        zmq_close(publisher);
    });

    // quick hack to make sure publisher thread is up and running before subscribers connect.
    Sleep(1000);

    std::thread subscriber_thread_all = std::thread([&]
    {
        void* subscriber = zmq_socket(context, ZMQ_SUB);
        int rc = zmq_connect(subscriber, endpoint);
        //assert(rc == 0);
        rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
        //assert(rc == 0);

        char message[12];

        while (1)
        {
            rc = zmq_recv(subscriber, message, 12, 0);
            //assert(rc != -1);
            printf("%s\n", message);
        }
        zmq_close(subscriber);
    });

    subscriber_thread_all.join();
    publisher_thread.join();

    zmq_ctx_destroy(context);
    return 0;
}

results still in all messages being delivered to the subscriber instead of just those starting with H.

I have also tried this with message envelopes and other transmit protocols but those don't seem to work either.


Solution

  • You've edited the line that used to subscribe to everything:

    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    

    into this:

    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
    

    but you forgot to change the size of the option value from 0 to 1. Since the length is still zero, you're still subscribed to everything. It should of course be:

    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 1);