Search code examples
javazeromqdistributed-computingpublish-subscribedistributed-system

ZeroMQ PUB/SUB topology on the same machine


Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ? I'd like a set of clients, each of which can make standard Request/Response calls using SocketType.REQ and SocketType.REP, but which can also receive notifications using SocketType.SUB and SocketType.PUB.

I've tried to implement this topology, taken from here, although my version only has one publisher.

enter image description here

Here is my publisher:

public class ZMQServerSmall 
{   
    public static void main(String[] args)
    {
        try (ZContext context = new ZContext()) 
        {           
            ZMQ.Socket rep = context.createSocket(SocketType.REP);
            rep.bind("tcp://*:5555");

            ZMQ.Socket pub = context.createSocket(SocketType.PUB);
            pub.bind("tcp://*:7777");           

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");

                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
        }
    }
}

Here is my proxy (I included a Listener to try to see what's going on):

public class ZMQForwarderSmall 
{
    public static void main(String[] args) 
    {       
        try 
        (
            ZContext context = new ZContext();   
        )
        {
            ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);            
            frontend.connect("tcp://*:7777");

            ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
            backend.bind("tcp://*:6666");

            IAttachedRunnable runnable = new Listener();
            Socket listener = ZThread.fork(context, runnable);

            ZMQ.proxy(frontend, backend, listener);
        }
        catch (Exception e) 
        {
            System.err.println(e.getMessage());
        } 
    }

    private static class Listener implements IAttachedRunnable 
    {    
        @Override
        public void run(Object[] args, ZContext ctx, Socket pipe) 
        {
            while (true) 
            {
                ZFrame frame = ZFrame.recvFrame(pipe);
                if (frame == null)
                    break; // Interrupted
                System.out.println(frame.toString());
                frame.destroy();
            }
        }
    }
}

Here is my Subscriber:

public class ZMQClientSmall
{   
    public static void main(String[] args) throws IOException
    {
        String input;

        try 
        (
            ZContext context = new ZContext();
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) 
        { 
            ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
            reqSocket.connect("tcp://localhost:5555");

            ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
            subSocket.connect("tcp://localhost:6666");

            subSocket.subscribe("".getBytes(ZMQ.CHARSET));

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);

                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println("Notification received: " + address + " : " + contents);
            }
        }   
    }   
}

Here is the test. I open four terminals; 1 publisher, 1 proxy, and 2 clients. When I make a request in either of the two client terminals, I expect to see a notification in both, but instead I only see the notification in the terminal that made the request. I know that both clients are using the same address (localhost:6666), but I'd hoped that the proxy would solve that problem.

Can anyone see anything obviously wrong here?


Solution

  • Q : Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ?

    Oh sure, it is. No doubts about that.


    Check the code. The responsibility of the order-of-execution is there. In this always so.

    Once the [Client]-No1 instance gets a plausible .readLine()-ed input it will jump-in:

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);
    
                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println(           "Notification received: "
                                  + address + " : "
                                  + contents
                                    );
            }
    

    Next it .send()-s over REQ and blocks (awaiting REP response)

    Given the [Client]-No2 instance also gets a plausible manual .readLine()-ed input it will jump-in the same while(){...}, yet it will not proceed any farther than into again blocking wait for REP-response. That will not get .recv()-ed any time but after the -No1 got served from the REP-side, so while the -No1 might have gotten out of the blocking-.recv(), not so the -No2 ( which will still hang inside the its blocking-.recv() for any next REP-side response ( which may come but need not ), while the No1 has already proceeded to the PUB/SUB-.recv(), which it will receive ( but never the No2 ), next rushing into the next blocking-input-feed from .readLine() Et Cetera, Et Cetera, Et Cetera, ..., Ad Infinitum

    So, these SEQ-of-In-Loop (REQ)-parts followed by (SUB)-parts in whatever number N > 1 of [Client]-instances, have effectively generated an EXCLUSIVE Tick-Tock-Tick-Tock clock-machine, mutually blocking an exclusive delivery of the PUB-ed in an N-interleaved order ( not speaking about the manual, .readLine()-driven, blocking step )

    The ZMQServerSmall is not aware about anything wrong, as it .send()-s in-order to any .recvStr()-ed counterparty over REQ/REP and PUB-s to all counterparties ( that do not read autonomously, but only after 've been manually .readLine() unblocked and only then ( after REQ/REP episodic ( potentially infinitely blocked ) steps ) may .recv() its next ( so far not read message-part ( yet, there I do not see any code that works with explicitly handling the presence / absence of the multipart-flags on the SUB-side .recv() operations )

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");
    
                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
    

    The ZMQServerSmall in the meantime sends ( N - 1 )-times more messages down the PUB-broadcast lane, so the Tick-Tock-Tick-Tock MUTEX REQ/SUB-loop-blocking "pendulum" is not 2-State, but N-State on the receiving sides ( all receive the same flow of PUB-ed messages,
    yet interleaved by the N-steps of the REQ/REP MUTEX-stepping )