Looking at this topic: zeromq pub/sub example in c (libzmq)
The code there works in windows (Visual Studio 2019 compiler and vcpkg installed zeromq - zeromq:x64-windows 4.3.4#6)
#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"
int main(int argc, char const* argv[])
{
DWORD thread_id = GetCurrentThreadId();
// create a context that will be shared by all threads
void* context = zmq_ctx_new();
const char* endpoint = "tcp://127.0.0.1:4040";
std::thread publisher_thread = std::thread([&]
{
void* publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, endpoint);
//assert(rc == 0);
while (1)
{
rc = zmq_send(publisher, "Hello World!", 12, 0);
//assert(rc == 12);
}
zmq_close(publisher);
});
// quick hack to make sure publisher thread is up and running before subscribers connect.
Sleep(1000);
std::thread subscriber_thread_all = std::thread([&]
{
void* subscriber = zmq_socket(context, ZMQ_SUB);
int rc = zmq_connect(subscriber, endpoint);
//assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
//assert(rc == 0);
char message[12];
while (1)
{
rc = zmq_recv(subscriber, message, 12, 0);
//assert(rc != -1);
printf("%s\n", message);
}
zmq_close(subscriber);
});
subscriber_thread_all.join();
publisher_thread.join();
zmq_ctx_destroy(context);
return 0;
}
This works and produces expected results with the sub thread receiving all the messages from the publisher.
However, If I add any subscription other than all, no filtering happens. For example, making the slight modification above to send alternating messages in the publisher and subscribe to those starting with "H":
#include <Windows.h>
#include <thread>
#include <vector>
#include <utility>
#include "C:\Source\vcpkg\installed\x64-windows\include\zmq.h"
int main(int argc, char const* argv[])
{
DWORD thread_id = GetCurrentThreadId();
// create a context that will be shared by all threads
void* context = zmq_ctx_new();
const char* endpoint = "tcp://127.0.0.1:4040";
std::thread publisher_thread = std::thread([&]
{
void* publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, endpoint);
//assert(rc == 0);
int message_counter(0);
while (1)
{
if (message_counter % 2 == 0)
{
rc = zmq_send(publisher, "Hello World!", 12, 0);
}
else
{
rc = zmq_send(publisher, "Goodbye World!", 12, 0);
}
++message_counter;
//assert(rc == 12);
}
zmq_close(publisher);
});
// quick hack to make sure publisher thread is up and running before subscribers connect.
Sleep(1000);
std::thread subscriber_thread_all = std::thread([&]
{
void* subscriber = zmq_socket(context, ZMQ_SUB);
int rc = zmq_connect(subscriber, endpoint);
//assert(rc == 0);
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
//assert(rc == 0);
char message[12];
while (1)
{
rc = zmq_recv(subscriber, message, 12, 0);
//assert(rc != -1);
printf("%s\n", message);
}
zmq_close(subscriber);
});
subscriber_thread_all.join();
publisher_thread.join();
zmq_ctx_destroy(context);
return 0;
}
results still in all messages being delivered to the subscriber instead of just those starting with H.
I have also tried this with message envelopes and other transmit protocols but those don't seem to work either.
You've edited the line that used to subscribe to everything:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
into this:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 0);
but you forgot to change the size of the option value from 0 to 1. Since the length is still zero, you're still subscribed to everything. It should of course be:
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "H", 1);