Search code examples
pythonsubprocesspython-asyncio

How to create my own pipe in asyncio.create_subprocess_exec


I have a program where I have to pull files from over the network (p4 print pulls files from version control server and prints to stdout). Because the network and IO was the biggest bottleneck and I am trying to use asyncio. I tried using the standard asyncio.subprocess.PIPE, but because I have multiple subprocesses I keep getting deadlocks. The solution I want to try is to make a new file and have stdout write to there.

Here are some of the errors I got

Attempt 2: Error "OSError: [Errno 9] Bad file descriptor"

async def _subprocess_wrapper(self, path):
    async with self.sem:
        _, write = os.pipe()
        proc = await asyncio.create_subprocess_exec(
            'p4', 'print', '-q', path,
            stdout=write,
            stderr=write
        )
        status = await proc.wait()
        file = os.fdopen(write, 'r')
        txt  = file.read()
        os.close(write)
        os.close(_)
        return status, txt

Attempt 3: Error "AttributeError: 'NoneType' object has no attribute 'read'"

async def _subprocess_wrapper(self, path):
    async with self.sem:
        _, write = os.pipe()
        proc = await asyncio.create_subprocess_exec(
            'p4', 'print', '-q', path,
            stdout=write,
            stderr=write
        )
        status = await proc.wait()
        if status != 0:
            txt = await proc.stderr.read()
        else:
            txt = await proc.stdout.read()
        os.close(write)
        os.close(_)
        return status, txt.decode()

Any help would be appreciated


Solution

  • I gave up on trying use my own pipe and changed my wait() to communicate, as per the docs...
    [wait] can deadlock when using stdout=PIPE or stderr=PIPE and the child process generates so much output that it blocks waiting for the OS pipe buffer to accept more data. Use the communicate() method when using pipes to avoid this condition

    My working code

    async def _subprocess_wrapper(self, path):
        async with self.sem:
            proc = await asyncio.create_subprocess_exec(
                'p4', 'print', '-q', path,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            stdout, stderr = await proc.communicate()
            txt = stdout if proc.returncode == 0 else stderr
            return proc.returncode, txt.decode()
    

    If anyone knows if there is a better way to make this scale I would appreciate the insight