Search code examples
pythonmemoryzipgoogle-cloud-storagedask

Python: How to Extract Zip-Files in Google Cloud Storage Without Running Out of Memory?


I need to extract the files in a zip file in Google Cloud Storage. I'm using a python function to do this, but I keep running into memory issues even when using a Dask Cluster and each Dask worker has a 20GB memory limit.

How could I optimize my code so that it doesn't consume as much memory? Perhaps reading the zip file in chunks and streaming them to a temporary file and then sending this file to Google Cloud Storage?

Would appreciate any guidance here.

Here is my code:

@task
def unzip_files(
    bucket_name,
    zip_data
):
    file_date = zip_data['file_date']
    gcs_folder_path = zip_data['gcs_folder_path']
    gcs_blob_name = zip_data['gcs_blob_name']

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    destination_blob_pathname = f'{gcs_folder_path}/{gcs_blob_name}'
    blob = bucket.blob(destination_blob_pathname)
    zipbytes = io.BytesIO(blob.download_as_string())

    if is_zipfile(zipbytes):
        with ZipFile(zipbytes, 'r') as zipObj:
            extracted_file_paths = []
            for content_file_name in zipObj.namelist():
                content_file = zipObj.read(content_file_name)
                extracted_file_path = f'{gcs_folder_path}/hgdata_{file_date}_{content_file_name}'
                blob = bucket.blob(extracted_file_path)
                blob.upload_from_string(content_file)
                extracted_file_paths.append(f'gs://{bucket_name}/{extracted_file_path}')
        return extracted_file_paths

    else:
        return []

Solution

  • I do not quite follow your code, but in general, dask plays nicely with complex file operations like this, using the fsspec and gcsfs libraries. For example (and you don't need Dask for this)

    import fsspec
    
    with fsspec.open_files("zip://*::gcs://gcs_folder_path/gcs_blob_name") as open_files:
        for of in open_files:
            with fsspec.open("gcs://{something from fo}", "wb") as f:
                data = True
                while data:
                    data = of.read(2**22)
                    f.write(data)
    

    You could instead do

    open_files = fssec.open_files(...)
    

    and parallelise the loop with Dask.