Search code examples
c++zeromq

How to send requests to ZeroMQ workers when router is bound to inproc address?


I am currently working with an existing program that uses ZeroMQ, and it is structured as follows:

  1. Clients (REQ) - Router - Proxy - Dealer - Workers (REP)
  2. The Router is bound to an inproc address
  3. The Dealer is bound to a TCP address (0.0.0.0:5555)
  4. The Workers are located remotely

I need to send requests to and receive replies from the Workers in my program, but I am facing difficulties because the Router is bound to an inproc address.

Assuming I cannot modify the existing program, is there a way to implement this functionality?

The current structure can be illustrated with the following diagram:

enter image description here

The current structure in code (C++) is as follows:

#include <chrono>
#include <functional>
#include <thread>
#include <vector>

#include <spdlog/spdlog.h>

#include <zmq.hpp>

void Proxy(zmq::context_t& context)
{
    zmq::socket_t clients { context, zmq::socket_type::router };
    zmq::socket_t workers { context, zmq::socket_type::dealer };

    clients.bind("inproc://router");
    workers.bind("tcp://0.0.0.0:5555");

    zmq::proxy(clients, workers);
}

void Client(zmq::context_t& context)
{
    using namespace std::chrono_literals;

    zmq::socket_t socket { context, zmq::socket_type::req };

    socket.connect("inproc://router");

    for (;;)
    {
        socket.send(zmq::str_buffer("some request"));

        zmq::message_t reply;

        socket.recv(reply);

        spdlog::trace("reply: {}", reply.to_string());

        std::this_thread::sleep_for(1s);
    }
}

void Worker(zmq::context_t& context) // actually located remotely
{
    zmq::socket_t socket { context, zmq::socket_type::rep };

    socket.connect("tcp://localhost:5555");

    for (;;)
    {
        zmq::message_t request;

        socket.recv(request);

        spdlog::trace("request: {}", request.to_string());

        socket.send(zmq::str_buffer("some reply"));
    }
}

int main()
{
    zmq::context_t context { 1 };

    std::jthread proxy { Proxy, std::ref(context) };

    std::vector<std::jthread> workers;

    for (int i = 0; i < 4; ++i)
    {
        workers.emplace_back(Worker, std::ref(context));
    }

    std::vector<std::jthread> clients;

    for (int i = 0; i < 4; ++i)
    {
        clients.emplace_back(Client, std::ref(context));
    }

    return 0;
}

I have tried various methods, but none of them seem to work properly.

For example, I created a Router - Proxy - Dealer structure in my program, connected the Dealer to the existing Dealer address (0.0.0.0:5555), and attempted to send requests to the Router.

Although the request was sent, it appears that the Workers are not receiving the request. (Using Wireshark, I confirmed that the requests are sent to the Dealer, but the Dealer does not forward the requests to the Workers.)

Is there any way to achieve this without implementing socket communication from scratch?

Thank you for your assistance!


Solution

  • When you say, "I cannot modify the existing program", if that means "zero source code changes" then you're not going to be able to tap into existing router; it's accessible only from inside the process.

    However, if you can you could simply add one line to bind the Proxy's clients socket twice, once to inproc as is, the second to something else, either ipc or tcp (ipc would be better, but doesnt' work on Windows). That way, clients inside the existing process can connect via the inproc transport, and clients in another process on the same machine can connect to the same socket using ipc. Something like:

    zmq::socket_t clients { context, zmq::socket_type::router };
    zmq::socket_t workers { context, zmq::socket_type::dealer };
    
    clients.bind("inproc://router");
    clients.bind("ipc:///path/to/mynamedpipe");
    workers.bind("tcp://0.0.0.0:5555");
    

    and the new process's clients would:

    socket.connect("ipc:///path/to/mynamedpipe");
    

    This doesn't change the proxy in any behaviorial way and preserves the rest of the application as is, which may be what you're hoping to achieve.