I have the following partition strategy in an ADLS Gen2 store
dir_parquet = "abfss://blah.windows.net/container_name/project=cars/make=*/model=*/*.parquet"
And this would load in the already partitioned data into a dataframe accordingly. I am aware of using the .filepath(n)
in SQL to achieve this, and effectively require the same thing but in a notebook dataframe.
How can I keep the project
, make
and model
values in the dataframe as separate columns?
According to this other SO thread setting .option("mergeSchema","true")
on read would work however it did not.
Thanks.
Since I received no answer to this and cannot find an official means to do so, I wrote the below code.
People with this problem may also find recursively returning blob directories to be useful and if so please see the deep_ls
function here (not my code).
import pyspark
import pyspark.sql.functions as F
from typing import List
def load_dataframes_with_partition_steps(dir_urls:List[str]) -> List[pyspark.sql.dataframe.DataFrame]:
"""
Written by: Paul Wilson, 2022-07-29
Takes in a list of blob directories including their partition steps and returns a list of dataframes with the associated
partition steps in the in the dataframe.
Ex. input...:
['abfss://[email protected]/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
'abfss://[email protected]/projects/cars/make=Ford/model=Fiesta/transmission=Automatic']
...which is turned into a list of dicts...
[{'url': 'abfss://[email protected]/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
'make': 'Vauxhall',
'model': 'Astra',
'transmission': 'Manual'},
{'url': 'abfss://[email protected]/projects/cars/make=Ford/model=Fiesta/transmission=Automatic',
'make': 'Ford',
'model': 'Fiesta',
'transmission': 'Automatic'}]
...and from that list a list of dataframes per url and associated partition steps, such as:
[df1, df2, ..., dfn]
"""
def load_dataframe(url:str=None, partition_steps:dict={}, file_format:str=None, df:pyspark.sql.dataframe.DataFrame=None) -> pyspark.sql.dataframe.DataFrame:
"""
Recursively load a dataframe and apply the partition steps via withColumn
"""
if file_format is None or len(file_format) == 0:
raise(ValueError('file_format must not be none, the URL must end in the file format (.parquet, .csv, etc)'))
# if there is a url and non empty partition steps without a df loaded then load the dataframe
if (url is not None and len(partition_steps.keys()) > 0 and df is None):
df = spark.read.format(file_format).load(url)
# df is loaded so do not pass a url indicating it is loaded
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if here then the df is loaded and proceed to apply withColumn
if (url is None and df is not None and len(partition_steps.keys()) > 0):
# load the first item in the partition steps dict
key = list(partition_steps.keys())[0]
value = list(partition_steps.values())[0]
# remove the first item from the partition steps dict
partition_steps.pop(key)
# load the dataframe with the new partition step
df = df.withColumn(key, F.lit(value))
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if it makes it here then the dataframe is loaded and the partition steps are applied
return df
# list of dataframe dict values of url and partition steps
list_df_dicts = list()
if not isinstance(dir_urls, list):
raise TypeError('dir_urls must be a list of string values')
# iterate over all urls and generate dict of partition values
for url in dir_urls:
# dict to store url and partition steps
d_dict = dict()
d_dict['url'] = url
# get the format from the last part of the url
file_format = url.split('.')[-1]
d_dict['file_format'] = file_format
# split the url keeping only partition steps (ex. make=Vauxhall)
url_split = [u for u in d_dict['url'].split('/') if '=' in u]
if len(url_split) == 0:
raise ValueError('The list of URLs must contain the partition steps, ex. make=ford')
# turn the partition=item into a key:value
partition_items = [u.split('=') for u in url_split]
# iterate over every item in partition_items=[['key', 'value']] and set dict[key] = value
for item in partition_items:
key = item[0]
value = item[1]
d_dict[key] = value
list_df_dicts.append(d_dict)
# iterate over all the dicts and load the dataframes to a list with their partition steps in place
list_dfs = list()
for d_dict in list_df_dicts:
# get the url from the d_dict
url = d_dict['url']
# get the format
file_format = d_dict['file_format']
# remove the url from the d_dict
d_dict.pop('url')
df = load_dataframe(url=url, partition_steps=d_dict, file_format=file_format)
list_dfs.append(df)
# return the list of dataframes
return list_dfs