Search code examples
pythonamazon-web-servicesasynchronouspython-asyncioboto

Asyncio with boto and for range?


I'm trying first time the asyncio botocore implementation. However, I'm quite sure I'm not getting the expected asynchronicity, likely due to my own lack of experience with it. :)

The goal of the bellow method is to duplicate all files in a bucket while suffixing keys with UUIDs.

async def async_duplicate_files_in_bucket(bucket,
                                          how_many_times=1):
    session = get_session()

    async with session.create_client('s3') as s3_client:
        s3_client: S3Client

        paginator = s3_client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket):

            for file in result["Contents"]:
                # it already includes the prefix in the same
                original_file_name: str = file["Key"]
                logger.debug(f"Duplicating file: {original_file_name} ")

                for _ in range(how_many_times):
                    new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                    copy_source = {
                        'Bucket': bucket,
                        'Key': original_file_name
                    }

                    await s3_client.copy_object(Bucket=bucket,
                                                CopySource=copy_source,
                                                Key=new_file_name)
                    print("-", end="")

When looking at the terminal:

  1. I see Duplicating file: file_1 not moving to the next file until it finishes duplicating file_1. Just then it I get a new log line with Duplicating file: file_2.
  2. print('-', end="") is not printing

Given my little experience with asyncio, I hypothesize that the for _ in range(how_many_times) is blocking the event loop.

Appreciate directions to better understand how to make use of asyncio in Python as well as to achieve the goal of the function.

Thanks.


Solution

  • You're not blocking the event loop. What you are doing is not making proper use of asyncio concurrency!

    Using the asyncio.gather() helper allows you to run a number of async operations in parallel.

    In this example, we buffer all desired copy_object operations in a big list, then run them all in parallel in a single huge batch:

    async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
        session = get_session()
    
        s3_client: S3Client
        async with session.create_client("s3") as s3_client:
            awaitables: list[Awaitable[Any]] = []
            paginator = s3_client.get_paginator("list_objects")
            async for result in paginator.paginate(Bucket=bucket):
    
                for file in result["Contents"]:
                    # it already includes the prefix in the same
                    original_file_name: str = file["Key"]
                    logger.debug(f"Duplicating file: {original_file_name} ")
    
                    for _ in range(how_many_times):
                        new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                        copy_source = {"Bucket": bucket, "Key": original_file_name}
    
                        awaitable = s3_client.copy_object(
                            Bucket=bucket,
                            CopySource=copy_source,
                            Key=new_file_name,
                        )
                        awaitables.append(awaitable)
                        print("-", end="")
    
            await asyncio.gather(*awaitables)
    

    Depending on the number of results in your paginator, you might want to run several batches, one per paginator page. This would use less memory, as you're only buffering one one page of copy_object operations at a time. It also limits the concurrency a bit, which might perform better (or worse!):

    async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
        session = get_session()
    
        s3_client: S3Client
        async with session.create_client("s3") as s3_client:
            
            paginator = s3_client.get_paginator("list_objects")
            async for result in paginator.paginate(Bucket=bucket):
                awaitables: list[Awaitable[Any]] = []
    
                for file in result["Contents"]:
                    # it already includes the prefix in the same
                    original_file_name: str = file["Key"]
                    logger.debug(f"Duplicating file: {original_file_name} ")
    
                    for _ in range(how_many_times):
                        new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                        copy_source = {"Bucket": bucket, "Key": original_file_name}
    
                        awaitable = s3_client.copy_object(
                            Bucket=bucket,
                            CopySource=copy_source,
                            Key=new_file_name,
                        )
                        awaitables.append(awaitable)
                        print("-", end="")
    
                await asyncio.gather(*awaitables)
    

    You can make even smaller batches, one per how_many_times group too. This will have an even lower memory footprint (and concurrency):

    async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
        session = get_session()
    
        s3_client: S3Client
        async with session.create_client("s3") as s3_client:
            
            paginator = s3_client.get_paginator("list_objects")
            async for result in paginator.paginate(Bucket=bucket):
                for file in result["Contents"]:
                    # it already includes the prefix in the same
                    original_file_name: str = file["Key"]
                    logger.debug(f"Duplicating file: {original_file_name} ")
    
                    awaitables: list[Awaitable[Any]] = []
                    for _ in range(how_many_times):
                        new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
                        copy_source = {"Bucket": bucket, "Key": original_file_name}
    
                        awaitable = s3_client.copy_object(
                            Bucket=bucket,
                            CopySource=copy_source,
                            Key=new_file_name,
                        )
                        awaitables.append(awaitable)
                        print("-", end="")
    
                    await asyncio.gather(*awaitables)
    

    I'd suggest benchmarking all approaches and see what works best for you. Even the last example, with the lowest concurrency, should perform better than your original code! 💪

    See also: asyncio.gather() docs