Search code examples
python-asynciozeromqpyzmq

Inter-process communication between async and sync tasks using PyZMQ


On a single process I have a tasks running on a thread that produces values and broadcasts them and several consumer async tasks that run concurrently in an asyncio loop.

I found this issue on PyZMQ's github asking async <-> sync communication with inproc sockets which is what I also wanted and the answer was to use .shadow(ctx.underlying) when creating the async ZMQ Context.

I prepared this example and seems to be working fine:

import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json


def producer(ctrl):
    # delay first push to give asyncio loop time
    # to start
    time.sleep(1)

    ctx = ctrl["ctx"]

    s = ctx.socket(zmq.PUB)

    s.bind(ctrl["endpoint"])

    v = 0
    while ctrl["run"]:
        payload = {"value": v, "timestamp": time.time()}

        msg = json.dumps(payload).encode("utf-8")

        s.send(msg)
        v += 1
        time.sleep(5)

    print("Bye")


def main():
    endpoint = "inproc://testendpoint"
    ctx = zmq.Context()
    actx = zmq.asyncio.Context.shadow(ctx.underlying)

    ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }

    th = threading.Thread(target=producer, args=(ctrl,))
    th.start()

    try:
        asyncio.run(amain(actx, endpoint))
    except KeyboardInterrupt:
        pass

    print("Stopping thread")
    ctrl["run"] = False
    th.join()


async def amain(ctx, endpoint):
    s = ctx.socket(zmq.SUB)
    s.subscribe("")
    s.connect(endpoint)

    loop = asyncio.get_running_loop()

    def stop():
        try:
            print("Closing zmq async socket")
            s.close()
        except:
            pass

        raise KeyboardInterrupt

    loop.add_signal_handler(signal.SIGINT, stop)

    while True:
        event = await s.poll(1000)
        if event & zmq.POLLIN:
            msg = await s.recv()
            payload = json.loads(msg.decode("utf-8"))

            print("%f: %d" % (payload["timestamp"], payload["value"]))


if __name__ == "__main__":
    sys.exit(main())

Is it safe to use inproc://* between a thread and asyncio task in this way? The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? Or am I missing something that I should consider?


Solution

  • Q :
    Is it safe to use inproc://* between a thread and asyncio task in this way?""

    A :
    First and foremost, I might be awfully wrong (not only here), yet having worked with ZeroMQ since native API 2.1.1+ I dare claim that unless newer "improvements" got lost the core principles ( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ), the answer here shall be YES, as much as the newer releases of pyzmq-binding kept all mandatory properties of the inproc:-Transport-Class without a compromise.

    Q :
    " The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? "

    A :
    Here my troubles start - ZeroMQ implementations were since ever developed based on Martin SUSTRIK's & Pieter HINTJENS' Zen-of-Zero -- i.e. also as Zero-sharing -- so never sharing was the principle ( though "share"-zmq.Context-instances were no problem to be used from different threads, to the contrary of the zmq.Socket-instances )

    Python (since ever & still valid in 2022-Q1) used to use & still uses a total [CONCURRENT]-code-execution avoider -- prevented by GIL-lock, which principally avoids any & all kinds of problems, arising from [CONCURRENT]-code-execution to never happen insider Python GIL-lock re-[SERIAL]-ised flow of code-execution, so even if the asyncio-part is built as a pythonic (non-destructive) part of the ecosystem, your code shall never "meet" any kind of concurrency-related issue, as the unless it gains GIL-lock, it does nothing but "hanging in NOP-s cracking" ( nuts-cracking in idle loop ).

    Being inside the same process, there seems no advantage to spawn another Context-instance at all ( this used to be the rock-solid certainty since ever, not to ever increase any kind of overheads - Zen-of-Zero ( almost )Zero-overhead ... ). The Sig/Msg core engine was, if performance or latency needs required, powered with more zmq.Context( IOthreads ) upon instantiations, yet these were zmq.Context-owned, not Python-GIL-governed/(b)locked threads, so the performance was pretty well scalable, without wasting any RAM/HWM/buffers/...-resources, without growing any overheads and very efficient, as the IO-threads were co-located for only indeed I/O-work, so not needed for inproc:-( protocol-less )-Transport-Class at all )

    Q :
    " Or am I missing something that I should consider? "

    A :
    Mixing asyncio, O/S-signals ( that are well documented how they interact with native ZeroMQ API ) and other layers of complexity is for sure possible, yet it comes at a cost - it makes the use-case less and less readable and more and more prone to conceptual-gaps and similar hard to decode "errors".

    I remember using Tkinter-mainloop() as a cost-wise very cheap and a super-stable framework for rapid-prototyping an MVC-{ M-odel, V-isual, C-ontroller }-parts of many-actors' indeed applications in Python. There were Zerop-problems to use ZeroMQ with a single Context-instance, passing the references of the respective AccessNodes' into whatever amount of event-handlers, supposing we kept the ZeroMQ Zen-of-Zero, i.e. no to "share" (meaning no two parts "use" (compete to use) one and the same AccessPoint "one-over-another")

    This all was designed-in, at "Zero-cost", by the ZeroMQ by-definition, so unless spoilt in some later phase, re-wrapping a re-wrapped native API, all this ought still work in 2022-Q1, ought it not?