I took one of the simple receive/request socket example on https://netmq.readthedocs.io/ and wanted to make it work with a parametrizedThread in an infinite loop. The code works fine for few loops, after which it throws the
A non-blocking socket operation could not be completed immediately
For what I got the above should happen immediately after the first loop and not randomly. What is the issue here? It sounds like something has to be flushed out in order to get a clean connection again (not sure).
class Program
{
public class Connector
{
public String connection { get; set; }
public ResponseSocket server { get; set; }
public Connector(string address, ResponseSocket server_)
{
this.connection = address;
this.server = server_;
}
}
static void Main(string[] args)
{
string connection = "tcp://localhost:5555";
using (var server = new ResponseSocket())
{
while (true)
{
try
{
server.Bind(connection);
}
catch (NetMQException e)
{
Console.WriteLine(e.ErrorCode);
}
Connector c = new Connector(connection, server);
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(c);
//runClientSide(connection, server);
}
}
}
private static void runClientSide(object param)
{
Connector conn = (Connector)param;
string connection = conn.connection;
ResponseSocket server = conn.server;
using (var client = new RequestSocket())
{
client.Connect(connection);
client.SendFrame("Hello");
string fromClientMessage = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.SendFrame("Hi Back");
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
//Console.ReadLine();
}
}
NetMQSockets are not thread safe and you are accessing the server from inside your client threads to send/receive data. Client shouldn't have access to server socket anyway.
First of all move the Bind outside of the while loop, it is needed once only, not for every client created. To wait for messages use NetMQPoller, it will handle everything else for you and will raise servers ReceiveReady event, once a message was received.
static void Main(string[] args) {
string connection = "tcp://localhost:5555";
using (var poller = new NetMQPoller()) {
using (var server = new ResponseSocket()) {
server.ReceiveReady += Server_ReceiveReady;
poller.Add(server);
poller.RunAsync();
server.Bind(connection);
// start 10000 clients
for(int i = 0; i < 10000; i++) {
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(connection);
}
Console.ReadLine(); //let server run until user pressed Enter key
}
}
}
//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
string fromClientMessage = e.Socket.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
e.Socket.SendFrame("Hi Back");
}
private static void runClientSide(object param) {
string connection = (string) param;
using (var client = new RequestSocket()) {
client.Connect(connection);
client.SendFrame("Hello");
//Removed server side code here and put it into ReceiveReady event
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
}
}