Search code examples
pythonamazon-web-servicesasync-awaitpython-asyncio

Python 3 asyncio with aioboto3 seems sequential


I am porting a simple python 3 script to AWS Lambda. The script is simple: it gathers information from a dozen of S3 objects and returns the results.

The script used multiprocessing.Pool to gather all the files in parallel. Though multiprocessing cannot be used in an AWS Lambda environment since /dev/shm is missing. So I thought instead of writing a dirty multiprocessing.Process / multiprocessing.Queue replacement, I would try asyncio instead.

I am using the latest version of aioboto3 (8.0.5) on Python 3.8.

My problem is that I cannot seem to gain any improvement between a naive sequential download of the files, and an asyncio event loop multiplexing the downloads.

Here are the two versions of my code.

import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import boto3
import aioboto3

BUCKET = 'some-bucket'
KEYS = [
    'some/key/1',
    [...]
    'some/key/10',
]

async def download_aio():
    """Concurrent download of all objects from S3"""
    async with aioboto3.client('s3') as s3:
        objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
        objects = await asyncio.gather(*objects)
        buffers = await asyncio.gather(*[o['Body'].read() for o in objects])

def download():
    """Sequentially download all objects from S3"""
    s3 = boto3.client('s3')
    for key in KEYS:
        object = s3.get_object(Bucket=BUCKET, Key=key)
        object['Body'].read()

def run_sequential():
    download()

def run_concurrent():
    loop = asyncio.get_event_loop()
    #loop.set_default_executor(ProcessPoolExecutor(10))
    #loop.set_default_executor(ThreadPoolExecutor(10))
    loop.run_until_complete(download_aio())

The timing for both run_sequential() and run_concurrent() are quite similar (~3 seconds for a dozen of 10MB files). I am convinced the concurrent version is not, for multiple reasons:

  • I tried switching to Process/ThreadPoolExecutor, and I the processes/threads spawned for the duration of the function, though they are doing nothing
  • The timing between sequential and concurrent is very close to the same, though my network interface is definitely not saturated, and the CPU is not bound either
  • The time taken by the concurrent version increases linearly with the number of files.

I am sure something is missing, but I just can't wrap my head around what.

Any ideas?


Solution

  • After loosing some hours trying to understand how to use aioboto3 correctly, I decided to just switch to my backup solution. I ended up rolling my own naive version of multiprocessing.Pool for use within an AWS lambda environment.

    If someone stumble across this thread in the future, here it is. It is far from perfect, but easy enough to replace multiprocessing.Pool as-is for my simple cases.

    from multiprocessing import Process, Pipe
    from multiprocessing.connection import wait
    
    
    class Pool:
        """Naive implementation of a process pool with mp.Pool API.
    
        This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
        is not mounted in an AWS Lambda environment.
        """
    
        def __init__(self, process_count=1):
            assert process_count >= 1
            self.process_count = process_count
    
        @staticmethod
        def wrap_pipe(pipe, index, func):
            def wrapper(args):
                try:
                    result = func(args)
                except Exception as exc:  # pylint: disable=broad-except
                    result = exc
                pipe.send((index, result))
            return wrapper
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_value, exc_traceback):
            pass
    
        def map(self, function, arguments):
            pending = list(enumerate(arguments))
            running = []
            finished = [None] * len(pending)
            while pending or running:
                # Fill the running queue with new jobs
                while len(running) < self.process_count:
                    if not pending:
                        break
                    index, args = pending.pop(0)
                    pipe_parent, pipe_child = Pipe(False)
                    process = Process(
                        target=Pool.wrap_pipe(pipe_child, index, function),
                        args=(args, ))
                    process.start()
                    running.append((index, process, pipe_parent))
                # Wait for jobs to finish
                for pipe in wait(list(map(lambda t: t[2], running))):
                    index, result = pipe.recv()
                    # Remove the finished job from the running list
                    running = list(filter(lambda x: x[0] != index, running))
                    # Add the result to the finished list
                    finished[index] = result
    
            return finished