I am reading (using polars/pyarrow) a directory full of parquets from Azure blob storage that totals 60gb on a Standard_E8_v3 (8 cores, 64 GB RAM, 200 GB disk) compute instance.
After I have read the data I am wanting to group the data and collect the result, however on collecting the result I receive this error
I don't really understand what it is telling me -
Is it saying that it can't process my data because it is too big for the machine I am on?
Is there an error with the code?
Is there an error with the data I need to handle?
If anyone could highlight the issue that would be much appreciated - it is important that any solution is a Polars based one :)
Code below:
import pyarrow.dataset as ds
from azureml.fsspec import AzureMachineLearningFileSystem
import polars as pl
from azureml.core import Workspace
ws = Workspace.from_config()
# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name
datastore_name = 'datastore_name'
path_on_datastore = 'path_to_data'
# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}'
aml_fs = AzureMachineLearningFileSystem(uri)
files = aml_fs.glob()
myds=ds.dataset(path_on_datastore, filesystem=aml_fs, format="parquet")
df = (
pl.scan_pyarrow_dataset(myds)
.select([
'COLUMN_LIST'
])
#.with_columns(pl.col('turnovervalue').cast(pl.Float64, strict=False))
.filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)
grouped = (df.lazy()
.groupby(['colA','colB'])
.agg(
[
pl.n_unique('colC').alias('Blah'),
pl.sum('colD').alias("BlahBlah"),
pl.n_unique('colE').alias('BlahBlahBlah'),
(pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
]
)
).collect()
EDIT:
Checked the schema of my Polars dataframe and it outputs a sensible result so I assume my connection to Azure is correct so I thought I would go upstream and check if what Polars is reading in from Pyarrow is working but looks like it is a Pyarrow issue rather than a Polars one. The below snip is from me just checking the head of the Pyarrow dataset that I have got from Azure.
I would assume that it seems like the data type it has inferred isn't the data type it is receiving when reading it in however I am unsure what the data at position 4 is (in the whole table) and am unsure how I am going figure it out???
Going to change some of the tags and title to hopefully align the new issue with the right people that can help
So I have my code working from a change to how I accessed the data within Azure so I am assuming that this was the overarching issue.
Instead of using the AzureMachineLearningFileSystem
I have turned to adlfs.AzureBlobFileSystem
.
There is a bit more code involved to access all the correct credentials etc but isn't too verbose - and ultimately it is working :)
import pyarrow.dataset as ds
import polars as pl
import adlfs
from azureml.core import Workspace,Datastore
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential
# Acquire a credential object
credential = DefaultAzureCredential()
# Get Workspace
ws = Workspace.from_config()
# Get specific datastore
datastore = Datastore.get(ws,'datastore_name')
# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
datastore_name = datastore.account_name
container_name = datastore.container_name
path_on_datastore = f'{container_name}/path/to/data'
# Provision the storage account, starting with a management object.
storage_client = StorageManagementClient(credential, subscription)
# Retrieve the account's primary access key
keys = storage_client.storage_accounts.list_keys(resource_group, datastore_name)
key_to_access = keys.keys[0].value
# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=datastore_name, account_key=key_to_access)
dd = ds.dataset(path_on_datastore, filesystem=fs)
df = (
pl.scan_pyarrow_dataset(dd)
.select([
'COLUMN_LIST'
])
.filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)
grouped = (df.lazy()
.groupby(['colA','colB'])
.agg(
[
pl.n_unique('colC').alias('Blah'),
pl.sum('colD').alias("BlahBlah"),
pl.n_unique('colE').alias('BlahBlahBlah'),
(pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
]
)
).collect()
References for help to others:
Pyarrow connection to Azure Blob - https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow
adlfs docs - https://github.com/fsspec/adlfs
Programmatically get blob connection string - How to programmatically retrieve the connection string from an Azure storage account in Python
Will accept my own answer for now but if there is a better way of doing this then please feel free to post and I will change the acceptance if so