I have an XPUB/XSUB device and a number of mock publishers running in one process. In a separate process, I want to connect a subscriber and print received message to the terminal. Below I will show two variants of a simple function to do just that. I have these functions wrapped as command-line utilities.
My problem is that the asyncio variant never receives messages.
On the other hand, the non-async variant works just fine. I have tested all cases for ipc and tcp transports. The publishing process never changes in my tests, except when I restart it to change transport. The messages are short strings and published roughly once per second, so we're not looking at performance problem.
The subscriber program sits indefinitely at the line msg = await sock.receive_multipart()
. In the XPUB/XSUB device I have instrumentation that shows the forwarding of the sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
message, same as when the non-async variant connects.
The asyncio variant (not working, as described)
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
async def task():
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
asyncio.run(task())
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
The regular blocking variant (works fine)
def subs(url, channel):
import zmq
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
def task():
while True:
msg = sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
task()
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
For this particular tool there is no need to use asyncio. However, I am experiencing this problem elsewhere in my code too, where an asynchronous recv never receives. So I'm hoping that by clearing it up in this simple case I'll understand what's going wrong in general.
My versions are
import zmq
zmq.zmq_version() # '4.3.2'
zmq.__version__ # '19.0.2'
I'm on MacOS 10.13.6.
I'm fully out of ideas. Internet, please help!
A working async variant is
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
async def task():
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
try:
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
asyncio.run(task())
I conclude that, when using asyncio zmq, sockets must be created with a call running on the event loop from which the sockets will be awaited. Even though the original form did not do anything fancy with event loops, it appears that the socket has an event loop different from that used by asyncio.run
. I'm not sure why, and I didn't open an issue with pyzmq because their docs show usage as in this answer, without comment.
Edit in response to a comment:
asyncio.run
always creates a new event loop, so the loop presumably created for the sockets instantiated outside of the co-routine passed to asyncio.run
(as in the asyncio variant in the original question) is obviously different.