Search code examples
pythonpython-asynciosynchronous

python's asyncio exchange data between 2 coroutines - one of them executing synchronous and cpu-intensive code


I have problems getting 2 routines cooperating, now even to the point that I've started asking myself if it's possible to do it with asyncio...

I have 2 routines. The first routine performs (only) synchronous and cpu-intensive processing. The second one performs ("proper") asyncio code. The first one cannot be on the same asyncio list of coroutines as the second one, because the second one cannot be waiting so long, i.e. while the synchronous, long lasting and cpu-intensive first one is running (and which cannot be interrupted, and as such isn't very suitable to some asyncio setup).

Both aren't related to one another, apart from the ("proper") asyncio ( = second) one indicating that the synchronous operation is to be done, and the first (synchronous) one has to inform the second (asyncio) one that is has finished execution and passing it its results.

Note that I have found plenty of examples of the one (asyncio) starting the other one, but none of these examples returns any result to any other "a"waiting one... Also, if the synchronous code finishes execution, I do am able to obtain its results. But I don't succeed in triggering the awaiting one, neither by setting some event (being awaited for), nor by having the asyncio code awaiting a future.

Would anyone be able to tell me if my setup is possible at all, or how I should set this up ? Of course, without the first (synchronous) one blocking any asyncio loop, and still keep the possibility for it to provide another (meanwhile "a"waiting) asyncio one of its results ?

EDIT : added the problematic code below. The problem is that self.finished_queue as an asyncio.Queue cannot be awaited (as it isn't informed about the synchronous loop having finished), and a (non-asyncio) "normal" queue.Queue cannot have get() called within an async def as it would block all awaits in the main asyncio loop...

class TestClass:

    def __init__(self):
        self.unfinished_queue = queue.Queue()
        self.finished_queue = queue.Queue()  # an asyncio.Queue here doesn't work properly (get() is not returning)


    async def asyncio_looping_run(self, duration: float):
        i = 0
        while True:
            i += 1
            print(f"taking a nap for {duration} seconds - {i} th time")
            await asyncio.sleep(duration)
            if i % 10 == 0:
                self.unfinished_queue.put_nowait(i)
                print("awaiting an entry to finish")

                # can't afford to be blocking here, because we are in this async def, and this would block all
                # other await'ing async defs !!!
                # SO : await'ing an asyncio.Queue should be used here, but this doesn't work !!!
                entry = self.finished_queue.get()

                print(f"{entry}")

    def long_lasting_synchronous_loop(self, msg: str):

        print(f"entered long_lasting_synchronous_loop('{msg}')")
        while True:
            print("waiting for something to do")
            input_item = self.unfinished_queue.get()
            print(f"found something to do ! - found {input_item} as input")
            print("mimicking a long synchronous operation by (synchronously) sleeping for 5 seconds")
            time.sleep(5)
            print("long synchronous operation finished ! will put it on the finished queue now")
            self.finished_queue.put_nowait(f"done {input_item} !")
            print(f"the result of {input_item} was put on the finished queue")


async def main():
    print("started for real now !")

    obj = TestClass()

    print("future 1 : outputs every 1/x second, yielding control to the asyncio loop")
    future1 = obj.asyncio_looping_run(0.1)

    print("future 2 : runs the lengthy DB operation, NOT yielding control to the asyncio loop")
    pool = concurrent.futures.ThreadPoolExecutor()
    future2 = asyncio.get_event_loop().run_in_executor(
        pool, obj.long_lasting_synchronous_loop, 'future2')

    print(f"started at {time.strftime('%X')}")

    done, pending = await asyncio.wait([future2, future1],
                                       return_when=asyncio.FIRST_COMPLETED)
    print("async main() loop exited !")


if __name__ == "__main__":

    constants.init_constants()

    try:
        asyncio.run(
            main()
        )

    except KeyboardInterrupt:
        print(f"Terminated on user request.")
    except asyncio.CancelledError:
        print(f"asyncio.CancelledError: main() terminated by user?")
    except ServerSocketBindingError as _e:
        print(_e)
        exit_code = constants.GENERAL_ERROR
    except Exception as _e:
        print(f"Terminated due to error: {_e}")
        print(f"main() terminated due to error: {_e}")
        exit_code = constants.GENERAL_ERROR
    finally:
        print(f"Handling cleanup.")

Solution

  • So, here is your code with the needed modifications to run. It was close to a complete, but you skipped all the imports, and had reference to a "constants" object not in the listing - I modified that, but the idea of a minimal working example is to include the imports and boilerplate needed to run the code.

    Anyway, what was needed: values are sent to other thread and retrieved from there - and they are read in the main thread from a single task, so there is no need to guard the queue against concurrent access by tasks on the asyncio, and it needs to "cross over threads": a traditional threading queue (queue.Queue) is to be used.

    As for your problem or not having the task stopping while waiting for the queue: it is trivial to resolve by pooling the thread, and sending the block=False argument to the queue .get method. I did just that. and third, the argument to asyncio.wait have to be wrapped in a task and can't be a raw co-routine.

    Other than that, I just tweaked the timings so the output is easier to observe, and removed references to your constants.

    
    import asyncio, queue
    
    import concurrent.futures
    import time
    
    ServerSocketBindingError = Exception
    
    class TestClass:
    
        def __init__(self):
            self.unfinished_queue = queue.Queue()
            self.finished_queue = queue.Queue()  # an asyncio.Queue here doesn't work properly (get() is not returning)
    
    
        async def asyncio_looping_run(self, duration: float):
            i = 0
            while True:
                i += 1
                print(f"taking a nap for {duration} seconds - {i} th time")
                await asyncio.sleep(duration)
                if i % 10 == 0:
                    self.unfinished_queue.put(i)
                    print("awaiting an entry to finish")
    
                    # can't afford to be blocking here, because we are in this async def, and this would block all
                    # other await'ing async defs !!!
                    # SO : await'ing an asyncio.Queue should be used here, but this doesn't work !!!
                try:
                    entry = self.finished_queue.get(block=False)
                except queue.Empty:
                    continue
                else:
                    print(f"{entry}")
    
        def long_lasting_synchronous_loop(self, msg: str):
    
            print(f"entered long_lasting_synchronous_loop('{msg}')")
            while True:
                print("waiting for something to do")
                input_item = self.unfinished_queue.get()
                print(f"found something to do ! - found {input_item} as input")
                print("mimicking a long synchronous operation by (synchronously) sleeping for 2 seconds")
                time.sleep(2)
                print("long synchronous operation finished ! will put it on the finished queue now")
                self.finished_queue.put_nowait(f"done {input_item} !")
                print(f"the result of {input_item} was put on the finished queue")
    
    
    async def main():
        print("started for real now !")
    
        obj = TestClass()
    
        print("future 1 : outputs every 1/x second, yielding control to the asyncio loop")
        future1 = asyncio.create_task(obj.asyncio_looping_run(0.3))
    
        print("future 2 : runs the lengthy DB operation, NOT yielding control to the asyncio loop")
        pool = concurrent.futures.ThreadPoolExecutor()
        future2 = asyncio.get_event_loop().run_in_executor(
            pool, obj.long_lasting_synchronous_loop, 'future2')
    
        print(f"started at {time.strftime('%X')}")
    
        done, pending = await asyncio.wait([future2, future1],)
        print("async main() loop exited !")
    
    
    if __name__ == "__main__":
    
        #constants.init_constants()
    
        try:
            asyncio.run(
                main()
            )
    
        except KeyboardInterrupt:
            print(f"Terminated on user request.")
        except asyncio.CancelledError:
            print(f"asyncio.CancelledError: main() terminated by user?")
        except Exception as _e:
            print(f"Terminated due to error: {_e}")
            print(f"main() terminated due to error: {_e}")
        finally:
            print(f"Handling cleanup.")