Search code examples
c#netmq

NetMQ How to detect slow subscribers using HighWatermark option and disconnect them by publisher?


I put simply delay before subscriber reads next frame, so I expected that this will simulate slow subscriber and give some effects of HighWatermark option. I don't observe anything, subscriber doesn't skip (drop) any messages neither slow down the publisher. I have run 1 publisher and x subscribers.

I try to play with a pub-sub example taken from documentation https://netmq.readthedocs.io/en/latest/pub-sub/

Is there any way to detect that subscriber is slow? I mean the number of queued messages to be received exceeded the HighWatermark value. Shall I expect any exception or event in NetMqMonitor? I am also looking if there is a option to disconnect such a slow subscriber.

using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace Sub
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("Subscriber started for Topic : {0}", topic);
            using (var subSocket = new SubscriberSocket())
            {
                subSocket.Options.ReceiveHighWatermark = 100;
                subSocket.Connect("tcp://localhost:12345");
                subSocket.Subscribe("topic1");
                Console.WriteLine("Subscriber socket connecting...");
                while (true)
                {
                    string messageTopicReceived = subSocket.ReceiveFrameString();
                    string messageReceived = subSocket.ReceiveFrameString();
                    Console.WriteLine($"{messageTopicReceived} {messageReceived}");
                    Thread.Sleep(50);
                }
            }
        }
    }
}
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;

namespace Pub
{
    class Program
    {
        static void Main()
        {
            using (var pubSocket = new PublisherSocket())
            {
                Console.WriteLine("Publisher socket binding...");
                pubSocket.Options.SendHighWatermark = 100;
                pubSocket.Bind("tcp://*:12345");
                Thread.Sleep(1000);
                for (var i = 0; i < 100000; i++)
                {
                    var msg = "msg-" + i;
                    Console.WriteLine("Sending message : {0}", msg);
                    pubSocket.SendMoreFrame("topic1").SendFrame(msg);
                    //Thread.Sleep(1);
                }
            }
        }
    }
}

Solution

  • NetMqMonitor doesn't trigger events to indicate that some messages have been dropped. Subscriber has to verify the generation time and decide itself what to do. http://zguide.zeromq.org/php:chapter5#Slow-Subscriber-Detection-Suicidal-Snail-Pattern

    Ps. To observe effect of ReceiveHighWatermark the example code from question has to be modified. The messages were too small.