Search code examples
wiresharknetmq

What is the currently proper request-response pattern for NetMQ?


I'm trying to upgrade a Windows desktop application from .Net Framework to .Net (Core) 6.0. As part of that, I need to use NetMQ instead of the old clrzmq. But every reference I find for how to do a simple request-response using the new API has been obsoleted by subsequent updates. I found working code at this question, but again, some of the methods used no longer exist. I attempted to convert the source reasonably and arrived at the below.

The server prints nothing; the client claims to be sending messages; Wireshark sees no messages on "port 5556". (I tagged Wireshark in case I'm using it wrong.)

I think if I can find out how this code should work I can properly convert my original application. Any help would be much appreciated.

Program.cs (by itself in its own solution, for the server):

using NetMQ;
using NetMQ.Sockets;
using NLog;

class Program
{
  private static Logger _logger = LogManager.GetCurrentClassLogger();

  static void Main(string[] args)
  {
    try
    {
      using (var responseSocket = new ResponseSocket())
      {
        responseSocket.Connect("tcp://localhost:5556");
        var poller = new NetMQPoller();
                responseSocket.ReceiveReady += RouterSocketOnReceiveReady;
        poller.Add(responseSocket);
        poller.Run();
      }
    }
    catch (Exception e)
    {
      Console.WriteLine(e);
    }

    Console.ReadKey();
  }

  private static void RouterSocketOnReceiveReady(object? sender, NetMQSocketEventArgs netMqSocketEventArgs)
  {
    NetMQMessage? clientMessage = new();
    bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
        ref clientMessage, 5);

    if (result == false || clientMessage == null)
    {
      Console.WriteLine("Something went wrong?!");
      return;
    }

    var address = clientMessage[0];
    var address2 = clientMessage[1];
    var clientMessageString = clientMessage[3].ConvertToString();

    //_logger.Debug("Message from client received: '{0}'", clientMessageString);
    Console.WriteLine(String.Format("Message from client received: '{0}'", clientMessageString));

    netMqSocketEventArgs
        .Socket.SendMoreFrame(address.Buffer)
        .SendMoreFrame(address2.Buffer)
        .SendMoreFrameEmpty()
        .SendFrame("I have received your message");
  }
}

CollectorDevice.cs (in the client project and solution):

using NetMQ;
using NetMQ.Sockets;
using NLog;

public class CollectorDevice : IDisposable
{
    private NetMQPoller _poller;
    private RouterSocket _frontendSocket;
    private DealerSocket _backendSocket;
    private readonly string _backEndAddress;
    private readonly string _frontEndAddress;
    private readonly int _expectedFrameCount;
    private readonly ManualResetEvent _startSemaphore = new(false);
    private readonly Thread _localThread;
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="backEndAddress"></param>
    /// <param name="frontEndAddress"></param>
    /// <param name="expectedFrameCount"></param>
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
    {
        _expectedFrameCount = expectedFrameCount;

        _backEndAddress = backEndAddress;
        _frontEndAddress = frontEndAddress;

        _frontendSocket = new RouterSocket(_frontEndAddress);
        _backendSocket = new DealerSocket(_backEndAddress);

        _backendSocket.ReceiveReady += OnBackEndReady;
        _frontendSocket.ReceiveReady += OnFrontEndReady;

        _poller = new NetMQPoller { _frontendSocket, _backendSocket };

        _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
    }

    public void Start()
    {
        _localThread.Start();
        _startSemaphore.WaitOne();
    }

    public void Stop()
    {
        _poller.Stop();
    }

    #region Implementation of IDisposable

    public void Dispose()
    {
        Stop();
    }

    #endregion


    #region Private Methods
    private void DoWork()
    {
        try
        {
            _startSemaphore.Set();

            _poller.Run();
        }
        catch (Exception e)
        {
            _logger.Error(e);
        }
    }

    private void OnBackEndReady(object? sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _frontendSocket.SendMultipartMessage(message);
    }

    private void OnFrontEndReady(object? sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _backendSocket.SendMultipartMessage(message);
    }

    #endregion
}

Program.cs (also in the client project and solution):

using NetMQ;
using NetMQ.Sockets;
using NLog;
using System.Text;

class Program
{
  private static Logger _logger = LogManager.GetCurrentClassLogger();


  private static void Main(string[] args)
  {
    Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
    Console.ReadKey();

    var encoding = Encoding.ASCII;

    using (var collectorDevice = new CollectorDevice("tcp://localhost:5556", "inproc://broker", 3))
    {
      collectorDevice.Start();

      var tasks = new List<Task>();
      for (int i = 0; i < 100; i++)
      {
        Console.WriteLine(i);
        int j = i;
        Task t = Task.Factory.StartNew(() =>
        {
          try
          {
            using (var requestSocket = new RequestSocket("inproc://broker"))
            {
              requestSocket.SendFrame(encoding.GetBytes(String.Format("Request client: {0} id: {1}", j, Task.CurrentId)));
              _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
              Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));

              string responseMessage = requestSocket.ReceiveFrameString();
              _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
              Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
            }
          }
          catch (Exception e)
          {
            Console.WriteLine(e);
            _logger.Error(e);
          }
        });
        tasks.Add(t);
      }

      Task.WaitAll(tasks.ToArray());
    }

  }
}

Solution

  • I got my answer at the GitHub issue I opened. The updated code successfully exchanges messages and terminates with 0, but the messages are out of order; additional logic would be required to put the messages in order, but the point of this question was just to exchange messages.

    The necessary changes are mostly in the server's Program.cs:

    using NetMQ;
    using NetMQ.Sockets;
    using NLog;
    
    Logger _logger = LogManager.GetCurrentClassLogger();
    
    try
    {
        using (var responseSocket = new ResponseSocket())
        {
            responseSocket.Bind("tcp://localhost:5556");
            var poller = new NetMQPoller();
            responseSocket.ReceiveReady += RouterSocketOnReceiveReady;
            poller.Add(responseSocket);
            poller.Run();
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
    
    Console.ReadKey();
    
    void RouterSocketOnReceiveReady(object? sender, NetMQSocketEventArgs netMqSocketEventArgs)
    {
        NetMQMessage? clientMessage = new();
        bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
            ref clientMessage, 5);
    
        if (result == false || clientMessage == null)
        {
            Console.WriteLine("Something went wrong?!");
            return;
        }
    
        var clientMessageString = clientMessage.Single().ToByteArray();
    
        //_logger.Debug("Message from client received: '{0}'", clientMessageString);
        Console.WriteLine(string.Format("Message from client received: '{0}'", string.Join(", ", clientMessageString)));
    
        netMqSocketEventArgs
            .Socket
            .SendFrame("I have received your message");
    }
    

    Here's the client's Program.cs:

    using NetMQ;
    using NetMQ.Sockets;
    using NLog;
    using System.Text;
    
    Logger _logger = LogManager.GetCurrentClassLogger();
    
    Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
    Console.ReadKey();
    
    var encoding = Encoding.ASCII;
    
    using var collectorDevice = new CollectorDevice("tcp://localhost:5556", "inproc://broker", 3);
    
    collectorDevice.Start();
    
    var tasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    {
        Console.WriteLine(i);
        int j = i;
        Task t = Task.Factory.StartNew(() =>
        {
            try
            {
                using (var requestSocket = new RequestSocket("inproc://broker"))
                {
                    requestSocket.SendFrame(encoding.GetBytes(string.Format("Request client: {0} id: {1}", j, Task.CurrentId)));
                    _logger.Debug(string.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                    Console.WriteLine(string.Format("Request client: {0} id: {1}", j, Task.CurrentId));
    
                    string responseMessage = requestSocket.ReceiveFrameString();
                    _logger.Debug(string.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                    Console.WriteLine(string.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                _logger.Error(e);
            }
        });
        tasks.Add(t);
    }
    
    Task.WaitAll(tasks.ToArray());
    

    And the client's CollectorDevice.cs:

    using NetMQ;
    using NetMQ.Sockets;
    using NLog;
    
    public class CollectorDevice : IDisposable
    {
        private NetMQPoller _poller;
        private RouterSocket _frontendSocket;
        private DealerSocket _backendSocket;
        private readonly string _backEndAddress;
        private readonly string _frontEndAddress;
        private readonly int _expectedFrameCount;
        private readonly ManualResetEvent _startSemaphore = new(false);
        private readonly Thread _localThread;
        private static Logger _logger = LogManager.GetCurrentClassLogger();
    
        /// <summary>
        /// Constructor
        /// </summary>
        /// <param name="backEndAddress"></param>
        /// <param name="frontEndAddress"></param>
        /// <param name="expectedFrameCount"></param>
        public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
        {
            _expectedFrameCount = expectedFrameCount;
    
            _backEndAddress = backEndAddress;
            _frontEndAddress = frontEndAddress;
    
            _frontendSocket = new RouterSocket(_frontEndAddress);
            _backendSocket = new DealerSocket(_backEndAddress);
    
            _backendSocket.ReceiveReady += OnBackEndReady;
            _frontendSocket.ReceiveReady += OnFrontEndReady;
    
            _poller = new NetMQPoller { _frontendSocket, _backendSocket };
    
            _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
        }
    
        public void Start()
        {
            _localThread.Start();
            _startSemaphore.WaitOne();
        }
    
        public void Stop()
        {
            _poller.Stop();
        }
    
        #region Implementation of IDisposable
    
        public void Dispose()
        {
            Stop();
        }
    
        #endregion
    
    
        #region Private Methods
        private void DoWork()
        {
            try
            {
                _startSemaphore.Set();
    
                _poller.Run();
            }
            catch (Exception e)
            {
                _logger.Error(e);
            }
        }
    
        private void OnBackEndReady(object? sender, NetMQSocketEventArgs e)
        {
            NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
            _frontendSocket.SendMultipartMessage(message);
        }
    
        private void OnFrontEndReady(object? sender, NetMQSocketEventArgs e)
        {
            NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
            _backendSocket.SendMultipartMessage(message);
        }
    
        #endregion
    }