Search code examples
zeromqpgm-protocol

PGM Receive very slow causing messages to be dropped?


I'm looking into ZeroMQ for its PGM support. Running on Windows (in a VirtualBox with MacOS as host, if that could matter), using the NetMQ library.

The test I want to do is very simple: send messages from A to B as fast as possible...

First I used TCP as transport; this got easily to >150 000 messages per second, with two receivers keeping pace. Then I wanted to test PGM; all I did was to replace the address "tcp://*:5556" with "pgm://239.0.0.1:5557" on both sides.

Now, the PGM tests give very strange results: the sender easily gets to >200 000 messages/s; the receiver though, manages to process only about 500 messages/s !?

So, I don't understand what is happening. After slowing down the sender (sleep 10ms after each message, since otherwise it's practically impossible to investigate the flow) it appears to me that the receiver is trying to keep up, initially sees every message passing by, then chokes, misses a range of messages, then tries to keep up again... I played with the HWM and Recovery Interval settings, but that didn't seem to make much difference (?!).

Can anyone explain what's going on?

Many thanks, Frederik

Note: Not sure if it's matters: as far as I understand, I don't use OpenPGM - I just download the ZeroMQ setup, and enabled 'Multicasting Support' in Windows.

This is the Sender code:

class MassSender
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;

    public static void Main(string[] args)
    {
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        SendMessages_0MQ_NetMQ(timer);
    }

    private static void SendMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
            {
                //publisher.Bind("tcp://*:5556");
                publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    string message = GetMessage();

                    byte[] body = Encoding.UTF8.GetBytes(message);
                    publisher.Send(body);
                }
            }
        }
    }

    private static string GetMessage()
    {
        return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
    }
    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
    }
}

and the Receiver:

class MassReceiver
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;
    private static string lastMessage = String.Empty;

    static void Main(string[] args)
    {
        // Assume that sender and receiver are started simultaneously.
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        ReceiveMessages_0MQ_NetMQ(timer);
    }

    private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
            {
                subscriber.Subscribe(""); // Subscribe to everything

                //subscriber.Connect("tcp://localhost:5556");
                subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    messageCounter++;

                    byte[] body = subscriber.Receive();

                    string message = Encoding.UTF8.GetString(body);                        
                    lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.  
                }
            }
        }
    }

    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
    }
}

Solution

  • What is the size of each message?

    You are not using OpenPGM, you are using what is called ms-pgm (Microsoft implementation of PGM).

    Anyway you might have to change the MulticastRate of the socket (it defaults to 100kbit/s).

    Also what kind of network are you using?