Search code examples
c#zeromqpublish-subscribenetmq

Can't get NetMQ pub-sub pattern to work with ReceiveReady


I'm trying my hands on NetMQ (3.3.3.4) and creating a pub-sub pattern.

I want a host/server to listen to all incoming data on one port (9000) and forward the data to all clients/subscribers on another port (9001).

The clients will then send data on 9000 and receive all messages sent (by whomever) on 9001.

Following the documentation I created something like the code below, but I can't get it to work. Mainly, I believe, because ReceiveReady is never called!

How I believe it should work:

  • client.Publish should cause the first line in host.SubscriberSocket_ReceiveReady to unblock and pass the data along to the other socket
  • When data has been passed along it should appear in the infinite running Task in the client

Results:

  • Breakpoints on // This line is never reached are never reached
  • There are no exceptions anywhere.
  • Switching the ports on the host so that publish = 9000 and subscribe = 9001 has no effect
  • Windows Firewall is turned off, so there should not be any blocking
  • It makes no difference if I'm putting the address into PublisherSocket constructor, or if I'm using _publisherSocket.Bind(address) in Host or _publisherSocket.Connect(address) in Client

What am I doing wrong?

Host

public class MyNetMQHost {

    private NetMQSocket _publishSocket;
    private NetMQSocket _subscribeSocket;
    private NetMQPoller _poller;

    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
        Task.Factory.StartNew(() => {
            using (_publishSocket = new PublisherSocket(publishAddress))
            using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
            using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
                _subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
                _poller.Run();
            }
        });
    }

    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
        var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
        _publishSocket.SendMultipartBytes(data);
    }
}

Client

public class MyNetMQClient {

    private readonly NetMQSocket _publishSocket;
    private readonly NetMQSocket _subscribeSocket;

    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
        _publishSocket = new PublisherSocket(publishAddress);
        _subscribeSocket = new SubscriberSocket(subscribeAddress);

        Task.Factory.StartNew(() => {
            while (true) {
                byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
                int one = 1; // This line is never reached
            }
        });
    }

    public void Publish(byte[] data) {
        _publishSocket.SendFrame(data);
    }
}

Tester

public class Tester {
    public void MyTester() {
        MyNetMQHost host = new MyNetMQHost();
        MyNetMQClient client = new MyNetMQClient();

        client.Publish(Encoding.Unicode.GetBytes("Hello world!");
    }
}

Solution

  • Both your broker and client never call suscribe. On the broker call suscriber.Subscribe("") to subscribe for all. On your client subscribe to what ever you want.

    In your broker you should actually use XSubscriber and XPublisher to move susvriptions around. That way you dont need the subscribe all. You can use Proxy class for that.