Search code examples
c#publish-subscribenetmq

NetMQ subscriber blocking with a published message


I have a Message Publishing Server class which creates a PUB socket at construction, with the following code:

this.context = NetMQContext.Create();
this.pubSocket = this.context.CreatePublisherSocket();
var portNumber = this.installerSettings.PublisherPort;
this.pubSocket.Bind("tcp://127.0.0.1:" + portNumber);

Sending a message using messagePublishingServer.Publish(message) executes:

this.pubSocket.SendMoreFrame(string.Empty).SendFrame(message);

The following xBehave test...

    [Scenario]
    public void PublishMessageScenario()
    {
        MessagePublishingServer messagePublishingServer = null;
        NetMQContext context;
        NetMQ.Sockets.SubscriberSocket subSocket = null;
        string receivedMessage = null;

        "Given a running message publishing server"._(() =>
        {
            var installerSettingsManager = A.Fake<IInstallerSettingsManager>();
            var settings = new InstallerSettings { PublisherPort = "5348" };
            A.CallTo(() => installerSettingsManager.Settings).Returns(settings);
            messagePublishingServer = new MessagePublishingServer(installerSettingsManager);
        });

        "And a subscriber connected to the publishing server"._(() =>
        {
            context = NetMQContext.Create();
            subSocket = context.CreateSubscriberSocket();
            subSocket.Options.ReceiveHighWatermark = 1000;
            subSocket.Connect("tcp://127.0.0.1:5348");
            subSocket.Subscribe(string.Empty);
        });

        "When publishing a message"._(() =>
        {
            messagePublishingServer.Publish("test message");

            // Receive the topic
            subSocket.ReceiveFrameString();

            // and the message
            receivedMessage = subSocket.ReceiveFrameString();
        });

        "Then the subscriber must have received it"._(() =>
        {
            receivedMessage.Should().NotBeNullOrEmpty();
            receivedMessage.Should().Be("test message");
        });
    }

... blocks in the first subSocket.ReceiveFrameString() which I find unexpected. Shouldn't the subscriber socket have queued the published message until the receive is called?


Solution

  • Publisher is like radio, if you was not connected and subscribed when the publisher published you miss the message. My tip is to put 100ms sleep after subscriber connect (only for testing).