Search code examples
c#.netzeromqnetmq

How to publish one socket with multiple threads in ZeroMQ


I am using NetMQ, ZeroMQ's c# library, and I implemented it as follows.

The problem here is that in the case of PublishAsync, multiple threads generate and invoke data from the outside.

However, since Publish is performed on one thread called _pubThread, there seems to be a delay.

Please let me know how to solve this situation and why I can't work with multiple threads in one socket in ZeroMQ.

public class NetMqManager
{
    private NetMQQueue<(string, string)> _queue = new NetMQQueue<(string, string)>();

    private readonly Thread _subThread;

    private readonly Thread _pubThread;

    private readonly SubscriberSocket _subscriber;

    private readonly PublisherSocket _publisher;

    private readonly ZeroMqEndPoint _endPoint;

    public NetMqManager(ZeroMqEndPoint endPoint)
    {
        _endPoint = endPoint;

        _publisher = new PublisherSocket();
        _publisher.Options.SendHighWatermark = 1500;
        _publisher.SendReady += Publisher_SendReady;
        _pubThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _publisher };
            poller.Run();
        });

        _subscriber = new SubscriberSocket();
        _subscriber.Options.SendHighWatermark = 1500;
        _subscriber.ReceiveReady += Subscriber_ReceiveReady;
        _subThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _subscriber };
            poller.Run();
        });
    }

    public async Task RunAsync()
    {
        await Task.Run(() =>
        {
            _publisher.Bind($"tcp://*:{_endPoint.PubPort}");
            _subscriber.Bind($"tcp://*:{_endPoint.SubPort}");

            _pubThread.Start();
            _subThread.Start();
        });
    }

    public async void PublishAsync(string topic, string payload)
    {
        await Task.Run(() => _queue.Enqueue((topic, payload)));
    }

    public async void SubscribeAsync(string topic)
    {
        _subscriber.Subscribe(topic);
    }

    private void Publisher_SendReady(object? sender, NetMQSocketEventArgs e)
    {
        var (topic, payload) = _queue.Dequeue();
        _publisher.SendMoreFrame(topic).SendFrame(payload);
    }

    private void Subscriber_ReceiveReady(object? sender, NetMQSocketEventArgs e)
    {
        var topic = e.Socket.ReceiveFrameString();
        var payload = e.Socket.ReceiveFrameString();

        Console.WriteLine($"Topic: {topic}, Payload: {payload}");
    }
}

Solution

  • Q : "Please let me know how to solve this situation and why I can't work with multiple threads in one socket in ZeroMQ."

    Part A : "... multiple threads in one socket in ZeroMQ"

    ZeroMQ native API can specify how many I/O-threads are to be used inside the Context()-instance, the core engine element, plus one can also specify in an ultimate level of details for each Socket-instance its affinity towards thus prepared Context()-instance's pool-of-I/O-threads.

    If your NetMQ-wrapper delivers this to user-level code, go use it accordingly to increase the I/O-performance.

    Part B : performance losses to be avoided

    If your user-level code inter-thread performance needs increase, avoid using the awfully expensive setup/decode associated with the use of the tcp://-Transport Class once data flows just among the same process' threads - inproc://-Transport Class ought be the least loaded by any add-on overheads.

    Part C : vulnerability or self-reliance?

    If your code relies on multi-frame ZeroMQ message compositions, you waste all the overhead processing present in the native PUB/SUB-Archetype, where a pure left-aligned byte-wise string-matching is performed for all activated subscriptions, so relying on multi-frame composition is another source of an awful waste of resources and another principal inefficiency ( while it might be some unwanted side-effect of the NetMQ-wrapper design compromise - IIRC this was a case I met some 6-8 years ago ).

    If your code strives to be also a robust, self-resilient, one shall never code using a blind assumption, that only fair & honest messaging participants are in the room. Not so much in inproc://-cases, yet if using any "network-open" Transport Classes like { tcp:// | udp:// | pgm:// | epgm:// | tipc:// | norm:// | vmci:// }, one shall always handle cases, when a non-compliant message arrives. Here, relying on having (always & only) two frames inside each message will throw your code into a deadlock or exception at the very first case an empty (zero-frame), single-frame or 3+frame message has arrived. Your .poll()/.recv()-methods have to cope with any case of the amounts of the frames actually present in the receiving-loop, not to shoot oneselves into our own legs.

    Last but not least, The Zen of Zero :

    Martin SUSTRIK & Pieter HINTJENS' evangelisation was clear and sound since ever - never share, never block.

    While many efforts've been spent on so called thread-safe modernisations of the framework, the design maxims are (imho) to be rather still preferred on the user-level code and "sharing" a socket-instance is a bad thing to do. Better keep things right, use inter-thread links ( like inproc://-s ) to move manySocketLessThreads-to-oneSocketOwner and focus on primary logic of work on either side of these smart Signalling/Messaging meta-plane interconnections.

    Better for performance,
    better for separation-of-concerns,
    better for debugging