Search code examples
tcpjuliaipczeromqpyzmq

Julia ZMQ - connecting to other WebSockets produces StateError


I am trying to use ZMQ to connect many publishers to one subscriber (python). This is one such publisher (I use connect instead of bind because the subscriber binds). The code works fine until I unblock the commented code below.

I then receive this error on Windows:

LoadError: StateError("Unknown error")

On Ubuntu:

StateError("Socket operation on non-socket")

Source code:

using ZMQ
using WebSockets
using JSON3

const uri = "wss://ws.okex.com:8443/ws/v5/public"

function produce_string()
    return "hi"
end

function main()
    payload = Dict(
            :op => "subscribe",
            :args => [
                Dict(
                    "channel" => "books50-l2-tbt",
                    "instType" => "Futures",
                    "instId" => "FIL-USD-220325",
                ),
            ],
        )
    # Unblock this code to produce error
    # @async while true
    #     WebSockets.open(uri) do ws
    #         confirmation = true
    #         if isopen(ws)
    #             write(ws, JSON3.write(payload))
    #         end
    #     end
    # end

    ctx = Context()
    zmq_socket = Socket(ctx, PUB)
    addr = "tcp://localhost:" * string(8093)
    ZMQ.connect(zmq_socket, addr)
    sleep(3)
    ZMQ.send(zmq_socket, "hi")

    while true
        my_string = produce_string()
        ZMQ.send(zmq_socket, my_string)
        println("sent")
        sleep(1)
    end

end

main()


Solution

  • This seems to be at least in part a bug (or difficult to understand behavior) so I suggest you create an issue on the repo. Perhaps it's related to: Test Error: Assertion failed: Socket operation on non-socket #147.

    However, we can do our best to try to understand what's gone wrong and perhaps find a workaround. Since ZMQ.jl uses libzmq to handle sockets on a low level it might interfere with Julia's handling of file descriptors, we may have a race condition. Let's test that hypothesis by modifying your code a bit:

        @async WebSockets.open(uri) do ws
            while true
                if isopen(ws)
                    msg = JSON3.write(payload)
                    write(ws, msg)
                    display(ws.socket.bio)
                    break
                end
            end
        end
    
        sleep(0.1)
        ctx = Context()
        zmq_socket = Socket(ctx, PUB)
        dump(zmq_socket)
        addr = "tcp://localhost:" * string(8093)
        ZMQ.connect(zmq_socket, addr)
        sleep(3)
        ZMQ.send(zmq_socket, "hi")
    

    I just changed some things to get the code to print out the necessary information. We see:

    Socket
      data: Ptr{Nothing} @0x0000000001d5e590
      pollfd: FileWatching._FDWatcher
        handle: Ptr{Nothing} @0x00000000018b7970
        fdnum: Int64 31
        refcount: Tuple{Int64, Int64}
          1: Int64 1
          2: Int64 0
        notify: Base.GenericCondition{Base.Threads.SpinLock}
          waitq: Base.InvasiveLinkedList{Task}
            head: Nothing nothing
            tail: Nothing nothing
          lock: Base.Threads.SpinLock
            owned: Int64 0
        events: Int32 0
        active: Tuple{Bool, Bool}
          1: Bool false
          2: Bool false
    

    and

    TCPSocket(RawFD(31) paused, 0 bytes waiting)
    

    The pollfd.fdnum field is 31 which is the same as the TCPSocket file descriptor so perhaps this is what's going on.

    What can we do?

    In the code above I already made one change to your original code, I moved the while loop inside the call to WebSockets.open, do you really want to open a new socket every loop? Secondly we can try to synchronize our threads a bit, to make sure we've finished opening the socket before calling ZMQ:

    function main()
        payload = Dict(
                :op => "subscribe",
                :args => [
                    Dict(
                        "channel" => "books50-l2-tbt",
                        "instType" => "Futures",
                        "instId" => "FIL-USD-220325",
                    ),
                ],
            )
        msg_channel = Channel(1)
        @async WebSockets.open(uri) do ws
            while true
                if isopen(ws)
                    msg = JSON3.write(payload)
                    put!(msg_channel, msg)
                    write(ws, msg)
                end
            end
        end
    
        println(take!(msg_channel))
        ctx = Context()
        zmq_socket = Socket(ctx, PUB)
        addr = "tcp://localhost:" * string(8093)
        ZMQ.connect(zmq_socket, addr)
        sleep(3)
        ZMQ.send(zmq_socket, "hi")
    
        while true
            my_string = produce_string()
            ZMQ.send(zmq_socket, my_string)
            println("sent")
            sleep(1)
        end
    end
    

    Here I've used a Channel to communicate between the threads, this ensures that the socket is done opening before we continue to the ZMQ code, it also makes the async thread block after one write. Hopefully you can adapt it to fit your use case.