Search code examples
pythonasynchronousboto3python-asyncio

How to concurrently list_objects in S3 bucket with aioboto3


I want to find all the unique paths in an S3 bucket, I want all the paths just before the file level. The depth of the directories can vary so not all files are found in the same depth for example I might have these files :

data/subdir1/subdir2/file.csv
data/subdir1/subdir3/subdir4/subdir5/file2.csv
data/subdir6/subdir7/subdir8/file3.csv

and I want these directories:

data/subdir1/subdir2/
data/subdir1/subdir3/subdir4/subdir5/
data/subdir6/subdir7/subdir8/

I am using the code below to get them. I use an async for loop with paginator cause I though they would be processed concurrently but I am not sure they are. It seems very slow so I think they are still done in series:

    subfolders = set()
    current_path = None

    paginator = self.s3_client.get_paginator("list_objects")

    async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for file in result.get("Contents", []):
            current_path = os.path.dirname(file.get("Key"))
            if current_path not in subfolders:
                subfolders.add(current_path)
            print(f"Part Done")

    return subfolders

My s3_client is an aioboto3 client.

Is there a way to speed up the process of finding and saving the directories ?

Note: I realized that this method does not bring me all the results only those from the current paginator can I get the next paginator asynchronously?


Solution

  • I didn't find a way to parallelize on the returned objects BUT I parallelized using many initial prefixes like this:

        subfolders = set()
        prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
        try:
    
            for prefix_future in asyncio.as_completed(prefix_tasks):
                prefix_subfolders = await prefix_future
                subfolders.update(prefix_subfolders)
    
        except KeyError as exc:
            print(f"Scanning origin bucket failed due to: {exc}")
            raise exc
    

    where my get_subfolders function is:

    async def get_subfolders(self, bucket: str, prefix: str) -> List[str]:
    
        subfolders = set()
    
        result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        objects = result.get("Contents")
        subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
    
        # Use next continuation token for pagination for truncated results.
        while result["IsTruncated"]:
            result = await self.s3_client.list_objects_v2(
                Bucket=bucket,
                Prefix=prefix,
                ContinuationToken=result["NextContinuationToken"],
            )
            objects = result.get("Contents")
            subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
    
        return subfolders
    

    and my get_paths_by_depth() function is :

        async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
        subfolders = set()
        current_path = None
        try:
            # Get only paths with depth equal to 'depth' levels
            for bucket_object in objects:
                current_path = os.path.dirname(bucket_object["Key"])
                if current_path.count("/") == depth:
                    subfolders.add(current_path)
    
        except Exception as exc:
            print(f"Getting subfolders failed due to error: {exc}")
            raise exc
    
        return subfolders