I am trying to implement a hub with XPUB/XSUB pattern with NetMQ + control socket (to control hub's behavior). I use proxy, NetMqPoller, and want to use control socket. But whatever I try - it does not work. Here is my code, any ideas why it does not work?
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlIn As New StreamSocket(">tcp://127.0.0.1:5678")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlIn, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using
And also there is a method which runs everytime ctrlIn socket receives data:
Sub ctrlIn_ReceiveReady(sender As Object, e As NetMQSocketEventArgs)
Dim bytes() As Byte
While (e.Socket.TryReceiveFrameBytes(bytes))
Console.WriteLine("Received {0} bytes.", bytes.Length)
End While
End Sub
Now short explanation: hub (XPUB/XSUB) works perfectly, that is when I start a publisher and a subscriber - I can see the messages flowing. But the control socket does not work, all I get is two messages in it:
Received 5 bytes.
Received 10 bytes.
And after that - control socket stays silent, no more bytes flow through it. Does anyone know where am I wrong? Or maybe anyone can point to a working example? I have been looking for an example, but could not find anythig with control sockets working.
Why is the control socket of type Stream? Also you should have another socket at the other end of the control socket, right now you are connecting the control socket of type Stream to the Publisher, Stream and Publisher cannot talk to each other.
Try something like this (sorry for syntax error, not a VB developer)
Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
Using plr As New NetMQPoller()
Using ctrlOut As New Dealer("@inproc://control")
Using ctrlIn As New Dealer(">inproc://control")
AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
plr.Add(xpubSocket)
plr.Add(xsubSocket)
plr.Add(ctrlIn)
Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlOut, plr)
proxy.Start()
plr.Run()
End Using
End Using
End Using
End Using