I want to find all the unique paths in an S3 bucket, I want all the paths just before the file level. The depth of the directories can vary so not all files are found in the same depth for example I might have these files :
data/subdir1/subdir2/file.csv
data/subdir1/subdir3/subdir4/subdir5/file2.csv
data/subdir6/subdir7/subdir8/file3.csv
and I want these directories:
data/subdir1/subdir2/
data/subdir1/subdir3/subdir4/subdir5/
data/subdir6/subdir7/subdir8/
I am using the code below to get them. I use an async for
loop with paginator cause I though they would be processed concurrently but I am not sure they are. It seems very slow so I think they are still done in series:
subfolders = set()
current_path = None
paginator = self.s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for file in result.get("Contents", []):
current_path = os.path.dirname(file.get("Key"))
if current_path not in subfolders:
subfolders.add(current_path)
print(f"Part Done")
return subfolders
My s3_client is an aioboto3
client.
Is there a way to speed up the process of finding and saving the directories ?
Note: I realized that this method does not bring me all the results only those from the current paginator can I get the next paginator asynchronously?
I didn't find a way to parallelize on the returned objects BUT I parallelized using many initial prefixes like this:
subfolders = set()
prefix_tasks = [get_subfolders(bucket, prefix) for prefix in prefixes]
try:
for prefix_future in asyncio.as_completed(prefix_tasks):
prefix_subfolders = await prefix_future
subfolders.update(prefix_subfolders)
except KeyError as exc:
print(f"Scanning origin bucket failed due to: {exc}")
raise exc
where my get_subfolders
function is:
async def get_subfolders(self, bucket: str, prefix: str) -> List[str]:
subfolders = set()
result = await self.s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
# Use next continuation token for pagination for truncated results.
while result["IsTruncated"]:
result = await self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
ContinuationToken=result["NextContinuationToken"],
)
objects = result.get("Contents")
subfolders.update(await self._get_paths_by_depth(objects=objects, depth=4))
return subfolders
and my get_paths_by_depth()
function is :
async def get_paths_by_depth(self, objects: dict, depth: int) -> Set[str]:
subfolders = set()
current_path = None
try:
# Get only paths with depth equal to 'depth' levels
for bucket_object in objects:
current_path = os.path.dirname(bucket_object["Key"])
if current_path.count("/") == depth:
subfolders.add(current_path)
except Exception as exc:
print(f"Getting subfolders failed due to error: {exc}")
raise exc
return subfolders