Search code examples
zeromqnetmq

NetMQ why is "SendReady" needed for Req-Rep?


I have a problem that I managed to fix... However I'm a little concerned as I don't really understand why the solution worked;

I am using NetMQ, and specifically a NetMQ poller which has a number of sockets, one of which is a REQ-REP pair.

I have a queue of requests which get dequeued into requests, and the server handles each request type as required and sends back an appropriate response. This had been working without issue, however when I tried to add in an additional request type the system stopped working as expected; what would occur is that the request would reach the server, the server would send the response... and the client would not receive it. The message would not be received at the client until the server was shut down (unusual behavior!).

I had been managing the REQ-REP pair with a flag that I set before sending a request, and reset on receipt of a reply. I managed to fix the issue by only triggering replies within the "SendReady" event of the REQ socket - this automagically fixed all of my issues, however I can't really find anything in the documentation that tells me why the socket might not have been in the "sendready" state, or what this actually does.

Any information that could be shed on why this is working now would be great :)

Cheers.

Edit: Source

Client:

"Subscribe" is run as a separate thread to the UI

    private void Subscribe(string address)
    {
        using (var req = new RequestSocket(address + ":5555"))
        using (var sub = new SubscriberSocket(address + ":5556"))
        using (var poller = new NetMQPoller { req, sub })
        {
            // Send program code when a request for a code update is received
            sub.ReceiveReady += (s, a) =>
            {
                var type = sub.ReceiveFrameString();
                var reply = sub.ReceiveFrameString();

                switch (type)
                {
                    case "Type1":
                        manager.ChangeValue(reply);
                        break;

                    case "Type2":
                        string[] args = reply.Split(',');
                        eventAggregator.PublishOnUIThread(new MyEvent(args[0], (SimObjectActionEventType)Enum.Parse(typeof(MyEventType), args[1])));
                        break;
                }
            };

            req.ReceiveReady += Req_ReceiveReady;

            poller.RunAsync();

            sub.Connect(address + ":5556");
            sub.SubscribeToAnyTopic();
            sub.Options.ReceiveHighWatermark = 10;

            reqQueue = new Queue<string[]>();

            reqQueue.Enqueue(new string[] { "InitialiseClient", "" });

            req_sending = false;

            while (programRunning)
            {
                if (reqQueue.Count > 0 && !req_sending)
                {
                    req_sending = true;
                    string[] request = reqQueue.Dequeue();
                    Console.WriteLine("Sending " + request[0] + " " + request[1]);
                    req.SendMoreFrame(request[0]).SendFrame(request[1]);
                }

                Thread.Sleep(1);
            }
        }
    }

    private void Req_ReceiveReady(object sender, NetMQSocketEventArgs e)
    {
        var req = e.Socket;

        var messageType = req.ReceiveFrameString();

        Console.WriteLine("Received {0}", messageType);

        switch (messageType)
        {
            case "Reply1":
                // Receive action

                break;

            case "Reply2":
                // Receive action

                break;

            case "Reply3":
                // Receive action

                break;

        }

        req_sending = false;
    }

Server:

        using (var rep = new ResponseSocket("@tcp://*:5555"))
        using (var pub = new PublisherSocket("@tcp://*:5556"))
        using (var beacon = new NetMQBeacon())
        using (var poller = new NetMQPoller { rep, pub, beacon })
        {
            // Send program code when a request for a code update is received
            rep.ReceiveReady += (s, a) =>
            {
                var messageType = rep.ReceiveFrameString();
                var message = rep.ReceiveFrameString();

                Console.WriteLine("Received {0} - Content: {1}", messageType, message);

                switch (messageType)
                {
                    case "InitialiseClient":
                        // Send
                        rep.SendMoreFrame("Reply1").SendFrame(repData);
                        break;

                    case "Req2":
                        // do something
                            rep.SendMoreFrame("Reply2").SendFrame("RequestOK");

                        break;

                    case "Req3":
                        args = message.Split(',');

                        if (args.Length == 2)
                        {
                            // Do Something

                            rep.SendMoreFrame("Reply3").SendFrame("RequestOK");
                        }
                        else
                        {
                            rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
                        }

                        break;

                    case "Req4":
                        args = message.Split(',');

                        if (args.Length == 2)
                        {

                            requestData = //do something

                            rep.SendMoreFrame("Reply4").SendFrame(requestData);
                        }
                        else
                        {
                            rep.SendMoreFrame("Ack").SendFrame("RequestError - incorrect argument format");
                        }

                        break;

                    default:
                        rep.SendMoreFrame("Ack").SendFrame("Error");
                        break;
                }
            };

            // setup discovery beacon with 1 second interval
            beacon.Configure(5555);
            beacon.Publish("server", TimeSpan.FromSeconds(1));

            // start the poller
            poller.RunAsync();

            // run the simulation loop
            while (serverRunning)
            {
                // todo - make this operate for efficiently
                // push updated variable values to clients
                foreach (string[] message in pubQueue)
                {
                    pub.SendMoreFrame(message[0]).SendFrame(message[1]);
                }

                pubQueue.Clear();

                Thread.Sleep(2);
            }

            poller.StopAsync();
        }

Solution

  • You are using the Request socket from multiple threads, which is not supported. You are sending on the main thread and receiving on the poller thread.

    Instead of using regular queue try to use NetMQQueue, you can add it to the poller and enqueue from the UI thread. Then the sending is happening on the poller thread as well as the receiving.

    You can read the docs here: http://netmq.readthedocs.io/en/latest/queue/