Search code examples
c#.netzeromqnetmq

How can I both Send and Receive from a Router socket in ZeroMQ or NetMQ?


I have a Dealer <--> Router setup in NetMQ v4 where I can asynchronously send and receive messages in any direction with no problems.

I now want to formalize that into an abstraction where the server (Router) listens for any incoming message but it also need to on demand broadcasts messages to any of the connected clients (Dealers).

I am trying to avoid using Pub <--> Sub sockets as I need the subscribers to also send messages to the server. The closest pattern to what I am trying to achieve is a WebSocket client-server communication.

The first part of listening to the client messages are done in something like:

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();
    while (true)
    {
        var msg = server.ReceiveMultipartMessage();

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }            

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

Now given that the sockets are not Thread-Safe, I am stuck on finding a way to broadcasts messages (coming from a different thread on demand) to all the clients.

I can probably use the TryReceiveMultipartMessage method in the loop instead with a set timeout after which I can check a queue for any broadcast messages and then loop through each client sending such message. Something like:

using (var server = new RouterSocket("@tcp://*:80"))
{
    var addresses = new HashSet<string>();

    var msg = new NetMQMessage();
    while (true)
    {
        var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
        if (!clientHasMsg)
        {
            // Check any incoming broacast then loop through all the clients
            // sending each the brodcast msg
            var broadMsg = new NetMQMessage();
            foreach (var item in addresses)
            {
                broadMsg.Append(item);
                broadMsg.AppendEmptyFrame();
                broadMsg.Append("This is a broadcast");
                server.SendMultipartMessage(broadMsg);
                broadMsg.Clear();
            }

            // Go back into the loop waiting for client messages
            continue;
        }

        var address = Encoding.UTF8.GetString(msg[0].Buffer);
        var payload = Encoding.UTF8.GetString(msg[2].Buffer);
        Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);

        var contains = addresses.Contains(address);
        if (!contains) { addresses.Add(address); }

        msg.Clear();
        msg.Append(address);
        msg.AppendEmptyFrame();
        msg.Append("Reply for: " + address);
        server.SendMultipartMessage(msg);
    }
}

This somehow does not feel right mainly due to:

  • What value for the timeout is a good value? 1 sec, 100 ms etc;
  • Is this the most efficient/performing solution as this program will be used to have 100k+ clients connected with each sending thousands of messages per second.

Any pointers on what's the best approach to this is very much appreciated.


Solution

  • You can use netmqqueue, it multi producer single consumer queue. You can add it to NetMQPoller and enqueue from multiple threads without lock.