Search code examples
azureazure-blob-storagepython-asyncio

How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?


Following this question: How to read parquet files from Azure Blobs into Pandas DataFrame? I wanted to add concurrency by donwloading multiple files "in parallel" using asyncio.

I'm stuck on how I can use the TaskGroup feature of Python 3.11 to start my task and wait for it to be completed. How can I retrive a list of the downloaded streams?

My code so far:

import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product

class BlobStorageAsync:
    def __init__(self, connection_string, container_name, logging_enable):
        self.connection_string = connection_string
        self.container_name = container_name
        container_client = ContainerClient.from_connection_string(
            conn_str=connection_string,
            container_name=container_name,
            # This client will log detailed information about its HTTP sessions, at DEBUGlevel
            logging_enable=logging_enable
        )
        self.container_client = container_client

    async def list_blobs_in_container_async(self, name_starts_with):
        blobs_list = []
        async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
            blobs_list.append(blob)
        return blobs_list

    async def download_blob_async(self, blob_name):
        blob_client = self.container_client.get_blob_client(blob=blob_name)
        async with blob_client:
            stream = await blob_client.download_blob()
            data = await stream.readall() # data returned as bytes-like object
        # return data as bytes (in-memory binary stream)
        return BytesIO(data)

    async def download_blobs_async(self, blobs_list):
        tasks = []
        for blob_name in blobs_list:
            task = asyncio.create_task(self.download_blob_async(blob_name))
            tasks.append(task)
        await asyncio.gather(*tasks)

Now I am stuck because I am not able to create and run the tasks, and wait for the results, i.e. this does not work:

async def main():
    blobs_list = ...
    connection_string = ...
    container_name = ...
    BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
    result = asyncio.run(BSA.download_blobs_async(blobs_list))
    # process the result: read the first stream and print it, for instance
    df = pd.read_parquet(result[0])
    print(df)

if __name__ == '__main__':
    try:
        main()
    except Exception as ex:
        print(ex)

Solution

  • How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?

    You can use the below code to read parquet files from Azure blobs into Pandas DataFrame concurrently with asyncio.

    Code:

    import asyncio
    import pandas as pd
    from azure.storage.blob.aio import ContainerClient
    from io import BytesIO
    
    
    class BlobStorageAsync:
        def __init__(self, connection_string, container_name):
            self.connection_string = connection_string
            self.container_name = container_name
            container_client = ContainerClient.from_connection_string(
                conn_str=connection_string,
                container_name=container_name,
            )
            self.container_client = container_client
    
        async def list_blobs_in_container_async(self, name_starts_with):
            blobs_list = []
            async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
                blobs_list.append(blob)
            return blobs_list
    
        async def download_blob_async(self, blob_name):
            blob_client = self.container_client.get_blob_client(blob=blob_name)
            async with blob_client:
                stream = await blob_client.download_blob()
                data = await stream.readall() # data returned as bytes-like object
            # return data as bytes (in-memory binary stream)
            return BytesIO(data)
    
        async def download_blobs_async(self, blobs_list):
            tasks = []
            for blob_name in blobs_list:
                task = asyncio.create_task(self.download_blob_async(blob_name))
                tasks.append(task)
            results = await asyncio.gather(*tasks)
            # return the list of downloaded streams
            return results
    
    async def main():
        blobs_list=["pqt_file4","pqt_file5"]
        connection_string =""
        container_name = "test1"
        BSA = BlobStorageAsync(connection_string, container_name)
        try:
            results = await BSA.download_blobs_async(blobs_list)
            for stream in results:
                df = pd.read_parquet(stream, engine="pyarrow")
                print(df)
        finally:
            await BSA.container_client.close()
    
    if __name__ == '__main__':
        try:
           asyncio.run(main())
        except Exception as ex:
            print(ex)
    

    Output:

      id       name
    0  1       Kala
    1  2  Arulmozhi
    2  6   Rajaraja
      id       name
    0  1     Aditha
    1  2  Arulmozhi
    2  3   Kundavai
    3  6   Rajaraja
    

    enter image description here