Search code examples
pythonazure-blob-storageazure-machine-learning-servicepyarrow

Pyarrow error: Tried reading X bytes starting at position X from file but only got X


I am reading (using polars/pyarrow) a directory full of parquets from Azure blob storage that totals 60gb on a Standard_E8_v3 (8 cores, 64 GB RAM, 200 GB disk) compute instance.

After I have read the data I am wanting to group the data and collect the result, however on collecting the result I receive this error

enter image description here

I don't really understand what it is telling me -

Is it saying that it can't process my data because it is too big for the machine I am on?

Is there an error with the code?

Is there an error with the data I need to handle?

If anyone could highlight the issue that would be much appreciated - it is important that any solution is a Polars based one :)

Code below:

import pyarrow.dataset as ds
from azureml.fsspec import AzureMachineLearningFileSystem
import polars as pl
from azureml.core import Workspace

ws = Workspace.from_config()

# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name
datastore_name = 'datastore_name'
path_on_datastore = 'path_to_data'

# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}'

aml_fs = AzureMachineLearningFileSystem(uri)
files = aml_fs.glob()

myds=ds.dataset(path_on_datastore, filesystem=aml_fs, format="parquet")
df = (
    pl.scan_pyarrow_dataset(myds)
    .select([
        'COLUMN_LIST'
    ])
    #.with_columns(pl.col('turnovervalue').cast(pl.Float64, strict=False))
    .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)

grouped = (df.lazy()
    .groupby(['colA','colB'])
    .agg(
        [
            pl.n_unique('colC').alias('Blah'),
            pl.sum('colD').alias("BlahBlah"),
            pl.n_unique('colE').alias('BlahBlahBlah'),
            (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
        ]
    )
).collect()

EDIT:

Checked the schema of my Polars dataframe and it outputs a sensible result so I assume my connection to Azure is correct so I thought I would go upstream and check if what Polars is reading in from Pyarrow is working but looks like it is a Pyarrow issue rather than a Polars one. The below snip is from me just checking the head of the Pyarrow dataset that I have got from Azure.

enter image description here

I would assume that it seems like the data type it has inferred isn't the data type it is receiving when reading it in however I am unsure what the data at position 4 is (in the whole table) and am unsure how I am going figure it out???

Going to change some of the tags and title to hopefully align the new issue with the right people that can help


Solution

  • So I have my code working from a change to how I accessed the data within Azure so I am assuming that this was the overarching issue.

    Instead of using the AzureMachineLearningFileSystem I have turned to adlfs.AzureBlobFileSystem.

    There is a bit more code involved to access all the correct credentials etc but isn't too verbose - and ultimately it is working :)

    import pyarrow.dataset as ds
    import polars as pl
    import adlfs
    from azureml.core import Workspace,Datastore
    from azure.mgmt.storage import StorageManagementClient
    from azure.identity import DefaultAzureCredential
    
    # Acquire a credential object
    credential = DefaultAzureCredential()
    # Get Workspace
    ws = Workspace.from_config()
    # Get specific datastore
    datastore = Datastore.get(ws,'datastore_name')
    
    # Azure Machine Learning workspace details:
    subscription = ws.subscription_id
    resource_group = ws.resource_group
    datastore_name = datastore.account_name
    container_name = datastore.container_name
    path_on_datastore = f'{container_name}/path/to/data'
    
    # Provision the storage account, starting with a management object.
    storage_client = StorageManagementClient(credential, subscription)
    
    # Retrieve the account's primary access key
    keys = storage_client.storage_accounts.list_keys(resource_group, datastore_name)
    key_to_access = keys.keys[0].value
    
    # ... load your credentials and configure the filesystem
    fs = adlfs.AzureBlobFileSystem(account_name=datastore_name, account_key=key_to_access)
    
    dd = ds.dataset(path_on_datastore, filesystem=fs)
    
    df = (
        pl.scan_pyarrow_dataset(dd)
        .select([
            'COLUMN_LIST'
        ])
        .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
    )
    
    grouped = (df.lazy()
        .groupby(['colA','colB'])
        .agg(
            [
                pl.n_unique('colC').alias('Blah'),
                pl.sum('colD').alias("BlahBlah"),
                pl.n_unique('colE').alias('BlahBlahBlah'),
                (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
            ]
        )
    ).collect()
    
    

    References for help to others:

    Pyarrow connection to Azure Blob - https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow

    adlfs docs - https://github.com/fsspec/adlfs

    Programmatically get blob connection string - How to programmatically retrieve the connection string from an Azure storage account in Python

    Will accept my own answer for now but if there is a better way of doing this then please feel free to post and I will change the acceptance if so