First time using ZMQ and I'm trying to setup a process to handle many getimage requests. What happens when I'm debugging is several exceptions that I'm trying to fix and implement a way to stop the QueueDevice terminate all the thread and exit gracefully.
Test driving the code
[TestMethod]
public void ProgramStartupShutdownTest()
{
var mockClientWrapper = new Mock<IClient>(MockBehavior.Strict);
var target = new SocketListener(2, mockClientWrapper.Object);
var task = Task.Factory.StartNew(() => target.StartListening("tcp://localhost:81"));
using (var client = NetMQContext.Create())
{
var socket = client.CreateRequestSocket();
socket.Connect("tcp://localhost:81");
var m = new NetMQMessage(new ShutdownMessage().CreateMessageFrames());
socket.SendMessage(m);
}
var timedout = task.Wait(200);
Assert.IsTrue(timedout);
}
Code I'm working with
private const string BackendBindAddress = "inproc://workers";
public SocketListener(int numberOfWorkers, IClient client )
{
numberOfThreads = numberOfWorkers;
_client = client;
}
public void StartListening(string address)
{
StartZeroMQ(address, context =>
{
for (var i = 0; i <= numberOfThreads; i++)
{
var t = new Thread(WorkerRoutine);
t.Start(
new WorkerParamters
{
Context = context,
Client = _client
}
);
}
});
}
private void StartZeroMQ(string address, Action<NetMQContext> setupWorkers)
{
using (var context = NetMQContext.Create())
{
var queueDevice = new QueueDevice(context, address, BackendBindAddress, DeviceMode.Blocking);
setupWorkers(context);
queueDevice.Start();
}
}
struct WorkerParamters
{
public NetMQContext Context;
public IClient Client;
}
private static void WorkerRoutine(object startparameter)
{
var wp = (WorkerParamters) startparameter;
var client = wp.Client;
using (var receiver = wp.Context.CreateResponseSocket())
{
receiver.Connect(BackendBindAddress);
var running = true;
while (running)
{
var message = receiver.ReceiveMessage();
var letter = Message.ParseMessageFrame(message,
imageMessage => GetImage(imageMessage, client),
videoMessage => GetVideo(videoMessage, client),
shutdownMessage =>
{
running = false;
return true;
});
receiver.Send(letter.ToJson(), Encoding.Unicode);
}
}
}
To overcome the issue I added an Initialize method to the device, call it before starting the workers and after calling the workers start the device.
You can grab the code from here(you need to compile the project from the repository), I will add a pull request later.
You can also write the device logic by hand, it shouldn't be complicated.