Search code examples
c#multithreadingzeromqnetmq

How do you shutdown a ZMQ QueueDevice from a worker thread in c#


First time using ZMQ and I'm trying to setup a process to handle many getimage requests. What happens when I'm debugging is several exceptions that I'm trying to fix and implement a way to stop the QueueDevice terminate all the thread and exit gracefully.

  1. receiver.Connect(BackendBindAddress); throws An unhandled exception of type 'NetMQ.InvalidException' occurred in NetMQ.dll with the error code NetMQ.zmq.ErrorCode.EINVAL. Why doesn't this exception stop further execution of the thread?
  2. I've tried making QueueDevice a static field and using queueDevice.stop() in the shutdown message function but then the threads start throwing TerminatingExceptions and neverexit. So can I shut down all the threads and the main thread?

Test driving the code

    [TestMethod]
    public void ProgramStartupShutdownTest()
    {
        var mockClientWrapper = new Mock<IClient>(MockBehavior.Strict);

        var target = new SocketListener(2, mockClientWrapper.Object);

        var task = Task.Factory.StartNew(() => target.StartListening("tcp://localhost:81"));
        using (var client = NetMQContext.Create())
        {
           var socket = client.CreateRequestSocket();
           socket.Connect("tcp://localhost:81");
           var m = new NetMQMessage(new ShutdownMessage().CreateMessageFrames());
           socket.SendMessage(m);
        }

        var timedout = task.Wait(200);
        Assert.IsTrue(timedout);
    }

Code I'm working with

private const string BackendBindAddress = "inproc://workers";
public SocketListener(int numberOfWorkers, IClient client )
    {
        numberOfThreads = numberOfWorkers;
        _client = client;
    }

public void StartListening(string address)
    {
        StartZeroMQ(address, context =>
        {
            for (var i = 0; i <= numberOfThreads; i++)
            {
                var t = new Thread(WorkerRoutine);
                t.Start(
                        new WorkerParamters
                        {
                            Context = context,
                            Client = _client
                        }
                    );
            }
        });
    }


    private void StartZeroMQ(string address, Action<NetMQContext> setupWorkers)
    {
        using (var context = NetMQContext.Create())
        {
            var queueDevice = new QueueDevice(context, address, BackendBindAddress, DeviceMode.Blocking);
            setupWorkers(context);
            queueDevice.Start();
        }
    }

    struct WorkerParamters
    {
        public NetMQContext Context;
        public IClient Client;
    }

    private static void WorkerRoutine(object startparameter)
    {
            var wp = (WorkerParamters) startparameter;
            var client = wp.Client;
            using (var receiver = wp.Context.CreateResponseSocket())
            {
                receiver.Connect(BackendBindAddress);
                var running = true;
                while (running)
                {
                        var message = receiver.ReceiveMessage();
                        var letter = Message.ParseMessageFrame(message,
                                                               imageMessage => GetImage(imageMessage, client),
                                                               videoMessage => GetVideo(videoMessage, client),
                                                               shutdownMessage =>
                                                               {
                                                                   running = false;
                                                                   return true;
                                                               });

                        receiver.Send(letter.ToJson(), Encoding.Unicode);
                }
            }
    }

Solution

  • To overcome the issue I added an Initialize method to the device, call it before starting the workers and after calling the workers start the device.

    You can grab the code from here(you need to compile the project from the repository), I will add a pull request later.

    You can also write the device logic by hand, it shouldn't be complicated.