Search code examples
pythonpython-3.xasync-awaitpython-asyncio

How to continually receive input and parse it in Python?


I imagine asyncio to be able to start a process in the background without blocking execution flow with tasks. After all, the docs state that asyncio.create_task schedules the task's execution and gives an example of "reliable 'fire-and-forget' background tasks" that creates and schedules tasks one-by-one.

I want to use asyncio to accept input and begin the parsing of the command while still accepting further input. Here's a quick example:

import asyncio
from time import sleep

class Test:
    def __init(self):
        self.calculating = False

    def calculate(self):
        # begin "calculation"
        n = 0
        self.calculating = True
        while self.calculating:
            sleep(1)
            n += 1
            print(n)
        self.calculating = False

    def stop(self):
        # causes calculation to end
        self.calculating = False

    async def parse(self, cmd):
        if cmd == "begin":
            self.calculate()
        elif cmd == "stop":
            self.stop()

async def main():
    t = Test()
    while True:
        cmd = input()
        task = asyncio.create_task(t.parse(cmd))
        await task
        

if __name__ == "__main__":
    asyncio.run(main())

Without awaiting the task, the command is never parsed. Awaiting the task does make the "calculation" begin when "begin" is inputted, as expected. However, the task is blocking, so there is never a chance for the user to input a stop command.

The examples of asyncio that I have seen are when the problems to be computed are known before running an event loop. For example, opening and downloading a given list of sites. This would be done with the asyncio.gather method on a bunch of tasks. But this isn't exactly my situation and I'm surprised that there isn't a wealth of examples that fit my use case.

What am I doing wrong? Might I not be using asyncio as intended? Or is my usage of input() and print() wrong, with some other alternative being more appropriate (i.e. logging)?


Solution

  • Figured it out using concurrent.futures.ThreadPoolExecutor and threading.Event.

    from concurrent.futures import ThreadPoolExecutor
    from threading import Event
    
    class Test:
        def __init__(self):
            self.future = 0
        def calculate(self, stop_event):
            # begin "calculation"
            n = 0
            stop_event.clear()
            while True:
                if stop_event.is_set():
                    break
                n += 1
                print(n)
    
        def parse(self, cmd, executor, stop_event):
            if cmd == "begin":
                print("Starting future")
                self.future = executor.submit(self.calculate, stop_event)
            elif cmd == "stop":
                if self.future.running():
                    print("Stopping future")
                    stop_event.set()
                else:
                    print("Future not running")
    
    def main():
        t = Test()
        with ThreadPoolExecutor(max_workers=1) as executor:
            t.future = executor.submit(lambda: None)
            stop_event = Event()
            
            while True:
                cmd = input("Command: ")
                if not cmd:
                    continue
                t.parse(cmd, executor, stop_event)
            
    
    if __name__ == "__main__":
        main()
    

    Quirks:

    • doesn't work in Python's IDLE shell.
    • input() dialog is somehow displayed after print statements that should following it (i.e. "Stopping future\n{n}\nCommand: ")

    I still don't entirely understand why asyncio wasn't appropriate for the problem while this implementation is. I would be happy to hear any explanation regarding this, or the quirks that I experienced.

    Credit to thomashle's sunfish for the example.