Search code examples
c#.netsocketszeromqnetmq

ZeroMQ thread/task advice


I have a a task that is running this method for my Sub/Pub pattern. The problem i see is that this loop will hang until data is received. If i want to change the Topic or the IP, i can't get this task to end. If i use a thread i can kill it - but that seems like an ugly solution. Is there a good asynchronous way to set up the receiving end of my subscriber.

ReceiveTask = new Task(ReceiveData);
ReceiveTask.Start();

private void ReceiveData()
{
    while (true)
    {
        byte[] messageTopicReceived = SubSocket.ReceiveFrameBytes();
        byte[] messageReceived = SubSocket.ReceiveFrameBytes();
    }
    //Code that uses byte array and do stuff
}

Solution

  • You could call the Try extension instead, it has the following signature:

    public static bool TryReceiveFrameBytes([NotNull] this IReceivingSocket socket, TimeSpan timeout, out byte[] bytes)
    

    That would allow you to pass a TimeSpan.Zero as the timeout which will make it a non-blocking call. With the while(true) that would work fine but you would probably need to add a Task.Delay or Thread.Sleep in there. A Task.Delay would work best as it accepts a CancellationToken. That token can be cancelled from a different thread to quickly get out of the loop. Something along these lines:

    private void ReceiveData(CancellationToken cancellationToken)
    {
        while (true)
        {
            bool receivedTopicMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageTopicReceived);
            bool receivedMessage = SubSocket.TryReceiveFrameBytes(TimeSpan.Zero, out byte[] messageReceived);
    
            Task.Delay(1000, cancellationToken);
            if (cancellationToken.IsCancellationRequested)
            {
                return;
            }
        }
        //Code that uses byte array and do stuff
    }