Search code examples
.netsocketspublish-subscribenetmq

NetMQ Proxy with control socket usage example in XSub/Xpub pattern?


I am trying to implement a hub with XPUB/XSUB pattern with NetMQ + control socket (to control hub's behavior). I use proxy, NetMqPoller, and want to use control socket. But whatever I try - it does not work. Here is my code, any ideas why it does not work?

    Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
        Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
            Using plr As New NetMQPoller()
                Using ctrlIn As New StreamSocket(">tcp://127.0.0.1:5678")
                    AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady

                    plr.Add(xpubSocket)
                    plr.Add(xsubSocket)
                    plr.Add(ctrlIn)

                    Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlIn, plr)
                    proxy.Start()

                    plr.Run()
                End Using
            End Using
        End Using
    End Using

And also there is a method which runs everytime ctrlIn socket receives data:

Sub ctrlIn_ReceiveReady(sender As Object, e As NetMQSocketEventArgs)
    Dim bytes() As Byte

    While (e.Socket.TryReceiveFrameBytes(bytes))
        Console.WriteLine("Received {0} bytes.", bytes.Length)
    End While
End Sub

Now short explanation: hub (XPUB/XSUB) works perfectly, that is when I start a publisher and a subscriber - I can see the messages flowing. But the control socket does not work, all I get is two messages in it:

Received 5 bytes.
Received 10 bytes.

And after that - control socket stays silent, no more bytes flow through it. Does anyone know where am I wrong? Or maybe anyone can point to a working example? I have been looking for an example, but could not find anythig with control sockets working.


Solution

  • Why is the control socket of type Stream? Also you should have another socket at the other end of the control socket, right now you are connecting the control socket of type Stream to the Publisher, Stream and Publisher cannot talk to each other.

    Try something like this (sorry for syntax error, not a VB developer)

    Using xpubSocket As New XPublisherSocket("@tcp://127.0.0.1:1234")
        Using xsubSocket As New XSubscriberSocket("@tcp://127.0.0.1:5678")
            Using plr As New NetMQPoller()
                Using ctrlOut As New Dealer("@inproc://control")
                    Using ctrlIn As New Dealer(">inproc://control")
                        AddHandler ctrlIn.ReceiveReady, AddressOf ctrlIn_ReceiveReady
    
                        plr.Add(xpubSocket)
                        plr.Add(xsubSocket)
                        plr.Add(ctrlIn)
    
                        Dim proxy As New Proxy(xsubSocket, xpubSocket, ctrlOut, plr)
                        proxy.Start()
    
                        plr.Run()
                End Using
            End Using
        End Using
    End Using