I am trying to use ZMQ to connect many publishers to one subscriber (python). This is one such publisher (I use connect instead of bind because the subscriber binds). The code works fine until I unblock the commented code below.
I then receive this error on Windows:
LoadError: StateError("Unknown error")
On Ubuntu:
StateError("Socket operation on non-socket")
Source code:
using ZMQ
using WebSockets
using JSON3
const uri = "wss://ws.okex.com:8443/ws/v5/public"
function produce_string()
return "hi"
end
function main()
payload = Dict(
:op => "subscribe",
:args => [
Dict(
"channel" => "books50-l2-tbt",
"instType" => "Futures",
"instId" => "FIL-USD-220325",
),
],
)
# Unblock this code to produce error
# @async while true
# WebSockets.open(uri) do ws
# confirmation = true
# if isopen(ws)
# write(ws, JSON3.write(payload))
# end
# end
# end
ctx = Context()
zmq_socket = Socket(ctx, PUB)
addr = "tcp://localhost:" * string(8093)
ZMQ.connect(zmq_socket, addr)
sleep(3)
ZMQ.send(zmq_socket, "hi")
while true
my_string = produce_string()
ZMQ.send(zmq_socket, my_string)
println("sent")
sleep(1)
end
end
main()
This seems to be at least in part a bug (or difficult to understand behavior) so I suggest you create an issue on the repo. Perhaps it's related to: Test Error: Assertion failed: Socket operation on non-socket #147.
However, we can do our best to try to understand what's gone wrong and perhaps find a workaround. Since ZMQ.jl uses libzmq to handle sockets on a low level it might interfere with Julia's handling of file descriptors, we may have a race condition. Let's test that hypothesis by modifying your code a bit:
@async WebSockets.open(uri) do ws
while true
if isopen(ws)
msg = JSON3.write(payload)
write(ws, msg)
display(ws.socket.bio)
break
end
end
end
sleep(0.1)
ctx = Context()
zmq_socket = Socket(ctx, PUB)
dump(zmq_socket)
addr = "tcp://localhost:" * string(8093)
ZMQ.connect(zmq_socket, addr)
sleep(3)
ZMQ.send(zmq_socket, "hi")
I just changed some things to get the code to print out the necessary information. We see:
Socket
data: Ptr{Nothing} @0x0000000001d5e590
pollfd: FileWatching._FDWatcher
handle: Ptr{Nothing} @0x00000000018b7970
fdnum: Int64 31
refcount: Tuple{Int64, Int64}
1: Int64 1
2: Int64 0
notify: Base.GenericCondition{Base.Threads.SpinLock}
waitq: Base.InvasiveLinkedList{Task}
head: Nothing nothing
tail: Nothing nothing
lock: Base.Threads.SpinLock
owned: Int64 0
events: Int32 0
active: Tuple{Bool, Bool}
1: Bool false
2: Bool false
and
TCPSocket(RawFD(31) paused, 0 bytes waiting)
The pollfd.fdnum
field is 31 which is the same as the TCPSocket file descriptor so perhaps this is what's going on.
What can we do?
In the code above I already made one change to your original code, I moved the while
loop inside the call to WebSockets.open
, do you really want to open a new socket every loop? Secondly we can try to synchronize our threads a bit, to make sure we've finished opening the socket before calling ZMQ:
function main()
payload = Dict(
:op => "subscribe",
:args => [
Dict(
"channel" => "books50-l2-tbt",
"instType" => "Futures",
"instId" => "FIL-USD-220325",
),
],
)
msg_channel = Channel(1)
@async WebSockets.open(uri) do ws
while true
if isopen(ws)
msg = JSON3.write(payload)
put!(msg_channel, msg)
write(ws, msg)
end
end
end
println(take!(msg_channel))
ctx = Context()
zmq_socket = Socket(ctx, PUB)
addr = "tcp://localhost:" * string(8093)
ZMQ.connect(zmq_socket, addr)
sleep(3)
ZMQ.send(zmq_socket, "hi")
while true
my_string = produce_string()
ZMQ.send(zmq_socket, my_string)
println("sent")
sleep(1)
end
end
Here I've used a Channel
to communicate between the threads, this ensures that the socket is done opening before we continue to the ZMQ code, it also makes the async thread block after one write. Hopefully you can adapt it to fit your use case.