Search code examples
c#.netmultithreadingzeromqnetmq

netMQ 4.0 multithreading


I have some problems with multi-threaded server based on netMQ 4.0. I tried to use http://zguide.zeromq.org/cs:mtserver, but there is no context on netMQ 4.0.

I tried:

for (var i = 0; i < workerCount; ++i)
{
    new Thread(() => Worker(connStr.Value)).Start();
}

//...
private void Worker(string connStr)
{
    using (var socket = new DealerSocket(connStr))
    {
        while (true)
        {
            var msg = socket.ReceiveMultipartMessage();
            //...
        }
    }
}

but I get error:

NetMQ.TerminatingException: CheckContextTerminated

and yes, it is terminated.

How can I create context in netMQ 4.0 or how can I create multi-threading server using netMQ 4.0?


Solution

  • If you are using the .NET version 4.0 or higher, the Thread creation approach is outdated and shouldn't be used in such way - if your workerCount is high enough and you aren't providing any scheduler logic, your performance could degrade significantly instead of benefit.

    This you can do instead your approach using TPL:

    1. You can easily replace your worker threads with LongRunning tasks.
    2. You probably should introduce the CancellationToken for your workers to stop them correctly.

    So your code could be something like this:

    /// field in your class
    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    
    using (var clients = new RouterSocket(connStr.Value))
    using (var workers = new DealerSocket())
    {
        workers.Bind("inproc://workers");
        for (var i = 0; i < workerCount; ++i)
        {
            Task.Factory.StartNew(Worker
                , cancellationTokenSource.Token
                , TaskCreationOptions.LongRunning
                , TaskScheduler.Default);
        }
        var prx = new Proxy(clients, workers);
        prx.Start();
    }
    
    private void Worker()
    {
        using (var socket = new ResponseSocket())
        {
            socket.Connect("inproc://workers");
            while (!cancellationTokenSource.Token.IsCancellationRequested)
            {
                 //...
            }
            // Cancel the task and exit
            cancellationTokenSource.Token.ThrowIfCancellationRequested();
        }
    }
    

    To simplify it you can pass the CancellationToken as a parameter to your Worker method.