Search code examples
c#message-queuezeromqnetmq

Why the client cann't receive message from server with NetMQ framework?


Recently, I use the NetMQ to send or receive message between server and client. Server codes like:

    void Main()
    {
      CreatePullAndPushSocket();
      Task.Factory.StartNew(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        });
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip1:port1");
        Push = new PushSocket("tcp://ip2:port2");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

The client codes like:

   void Main()
    { 
      new Thread(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        }).Start();
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip2:port2");
        Push = new PushSocket("tcp://ip1:port1");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

When I run two application, which is server app, another is client app.

  • 1:Client send a message to Server
  • 2:Server can receive the message from the client
  • 3:Server send another message to client
  • 4:The client cann't receive the message!!!

So strange, I have followed the guidance https://netmq.readthedocs.io/en/latest/push-pull/


Solution

  • In NetMQ very important thing is threading model. Really important is that, You shouldn't use socket in multiple threads. So if Thread#1 created socket than it should use it. If You want to send message from other thread (lets say Thread#2) by using the same socket just forget about this. You should somehow send messgae from Thread#2 to Thread#1 and then it should send it trough socket to client.

    So basicly CreatePullAndPushSocket is wrong and strange things can happend than. You are creating sockets in one thread and using in other. Its simply wrong.

    The other thing is Your Thread.Sleep. You shouldn't use Thread.Sleep, because Your thread is sleeping 1s and then is checking socket once, than sleep and check once. NetMQ has function TryReceive, with timeout. So it can check socket for 1 second and exits to check if You call cancel/stop or whatever. Or even better there is a poller, which will listen on socket all the time, and allow us to call stop from other Thread.

    Lets look at this code:

    private Poller _poller;
    
    public void Start()
    {
      Task.Factory.StartNew(()=> {
         using(var pullSocket = new PullSocket("tcp://ip1:port1"))
         using(_poller = new Poller())
         {
            pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
            {
                var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
            }
            _poller.Add(pullSocket);
            _poller.Run();
         }
        });
    }
    public void Stop()
    {
       _poller?.Stop();
    }
    

    or if You would like to use code without poller with while loop:

    private readonly CancellationTokenSource _cts;

    public void Start()
    {
       _cts = new CancellationTokenSource();
       var token = _cts.Token;
    
      Task.Factory.StartNew(()=> {
         using(var pullSocket = new PullSocket("tcp://ip1:port1"))
         {
            while (cancellationToken.IsCancellationRequested == false)
            {
                NetMQMessage message = new NetMQMessage();
                bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);
    
                if (success == false)
                {
                    continue;
                }
    
                //if You reach this line, than You have a message
            }
         }
        },
        token,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    }
    public void Stop()
    {
        _cts.Cancel();
        _cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
    }
    

    So going back to Your question, You should use socket only from inside its thread where You created it. Good thing is to use socket always in using statement to always release it in the end.

    I cant see SendMessageToClient method usage, but I assumed that You are calling it from some button or something. You can do that if a constructor of a socket was called from that thread. If you could show me where You are calling this method I could say something about this.