I need to extract the files in a zip file in Google Cloud Storage. I'm using a python function to do this, but I keep running into memory issues even when using a Dask Cluster and each Dask worker has a 20GB memory limit.
How could I optimize my code so that it doesn't consume as much memory? Perhaps reading the zip file in chunks and streaming them to a temporary file and then sending this file to Google Cloud Storage?
Would appreciate any guidance here.
Here is my code:
@task
def unzip_files(
bucket_name,
zip_data
):
file_date = zip_data['file_date']
gcs_folder_path = zip_data['gcs_folder_path']
gcs_blob_name = zip_data['gcs_blob_name']
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
destination_blob_pathname = f'{gcs_folder_path}/{gcs_blob_name}'
blob = bucket.blob(destination_blob_pathname)
zipbytes = io.BytesIO(blob.download_as_string())
if is_zipfile(zipbytes):
with ZipFile(zipbytes, 'r') as zipObj:
extracted_file_paths = []
for content_file_name in zipObj.namelist():
content_file = zipObj.read(content_file_name)
extracted_file_path = f'{gcs_folder_path}/hgdata_{file_date}_{content_file_name}'
blob = bucket.blob(extracted_file_path)
blob.upload_from_string(content_file)
extracted_file_paths.append(f'gs://{bucket_name}/{extracted_file_path}')
return extracted_file_paths
else:
return []
I do not quite follow your code, but in general, dask plays nicely with complex file operations like this, using the fsspec
and gcsfs
libraries. For example (and you don't need Dask for this)
import fsspec
with fsspec.open_files("zip://*::gcs://gcs_folder_path/gcs_blob_name") as open_files:
for of in open_files:
with fsspec.open("gcs://{something from fo}", "wb") as f:
data = True
while data:
data = of.read(2**22)
f.write(data)
You could instead do
open_files = fssec.open_files(...)
and parallelise the loop with Dask.