Search code examples
pythonsubprocesspython-asynciopython-multithreadinginotifywait

Safely pass data from a subprocess into async task with asyncio.Queue


I have a directory structure in an Amazon Linux EC2 instance. I would like to have a Python script asynchronously monitor this directory (and all subdirectories) for file creations.

I've decided to run inotifywait in a subprocess and pass the output into an async Task for processing. I run the subprocess and monitor the output in its own thread, and pass the stdout into an asyncio.Queue() using put_nowait() which is watched by an asyncio Task running on the main thread.

import asyncio
import subprocess
import threading

def watch_dir(dir_to_watch: str, output_queue: asyncio.Queue):
    inotify_cmd = f'sudo inotifywait -e create -m -r {dir_to_watch}'
    proc = subprocess.Popen(inotify_cmd,
                            stdout=subprocess.PIPE,
                            shell=True)

    while True:
        line = proc.stdout.readline().rstrip()
        if not line:
            break
        output_queue.put_nowait(line)


async def process_lines(input_queue: asyncio.Queue):
    while True:
        line = await input_queue.get()
        # do stuff with line

if __name__ == '__main__':
    q = asyncio.Queue()
    dir_watch_thread = threading.Thread(target=watch_dir, args=(_dir_to_watch, q))
    dir_watch_thread.start()
    asyncio.run(process_lines(q))

Is there a better, more performant/resource efficient way to do this? Is this even a safe usage of asyncio.Queue()? I've read things about janus, which describes itself as the safe way to pass data through a queue between synchronous and asynchronous contexts. Do I need to use such a data structure (and why)? I don't want to include an additional dependency if I do not have to.


Solution

  • Is this even a safe usage of asyncio.Queue?

    No, because asyncio.Queue is not thread-safe. You might even observe this, the symptom being that your coroutine that reads from the queue doesn't immediately notice an item coming in, but only wakes up when an unrelated IO or timeout event happens on the event loop.

    One way to fix the issue is to use call_soon_threadsafe:

    # this requires you to pass "loop" as well
    loop.call_soon_threadsafe(output_queue.put_nowait, line)
    

    A better way is to use asyncio's own subprocess handling, which allows you to avoid threads altogether. For example (untested):

    async def watch_dir(dir_to_watch, output_queue):
        proc = await asyncio.create_subprocess_exec(
            'sudo', 'inotifywait', '-e', 'create', '-m',
            '-r', dir_to_watch, stdout=subprocess.PIPE)
        while True:
            line = await proc.stdout.readline()
            if not line:
                break
            await output_queue.put(line.rstrip())
    
    async def process_lines(dir_to_watch):
        queue = asyncio.Queue()
        # run watch_dir() in the "background"
        asyncio.create_task(watch_dir(dir_to_watch), queue)
        while True:
            line = await queue.get()
            print(line)  # ...
    
    if __name__ == '__main__':
        asyncio.run(process_lines(_watch_dir))
    

    In the above code I have replaced the use of shell=True with explicit arguments to avoid the possibility of shell injection, especially relevant with sudo.

    Is there a better, more performant/resource efficient way to do this?

    In a simple single-producer single-consumer setup you can do away with the queue and just use a generator:

    async def watch_dir(dir_to_watch):
        proc = await asyncio.create_subprocess_exec(
            'sudo', 'inotifywait', '-e', 'create', '-m',
            '-r', dir_to_watch, stdout=subprocess.PIPE)
        while True:
            line = await proc.stdout.readline()
            if not line:
                break
            yield line.rstrip()
    
    async def process_lines(dir_to_watch):
        async for line in watch_dir(dir_to_watch):
            print(line)  # ...