Search code examples
apache-spark-sqlazure-synapseazure-data-lake-gen2

Include partition steps as columns when reading Synapse spark dataframe


I have the following partition strategy in an ADLS Gen2 store

dir_parquet = "abfss://blah.windows.net/container_name/project=cars/make=*/model=*/*.parquet"

And this would load in the already partitioned data into a dataframe accordingly. I am aware of using the .filepath(n) in SQL to achieve this, and effectively require the same thing but in a notebook dataframe.

How can I keep the project, make and model values in the dataframe as separate columns?

According to this other SO thread setting .option("mergeSchema","true") on read would work however it did not.

Thanks.


Solution

  • Since I received no answer to this and cannot find an official means to do so, I wrote the below code.

    People with this problem may also find recursively returning blob directories to be useful and if so please see the deep_ls function here (not my code).

    import pyspark
    import pyspark.sql.functions as F
    from typing import List
    
    def load_dataframes_with_partition_steps(dir_urls:List[str]) -> List[pyspark.sql.dataframe.DataFrame]:
        """
        Written by: Paul Wilson, 2022-07-29
    
        Takes in a list of blob directories including their partition steps and returns a list of dataframes with the associated
        partition steps in the in the dataframe.
    
        Ex. input...:
            ['abfss://[email protected]/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
                'abfss://[email protected]/projects/cars/make=Ford/model=Fiesta/transmission=Automatic']
    
        ...which is turned into a list of dicts...
            [{'url': 'abfss://[email protected]/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
            'make': 'Vauxhall',
            'model': 'Astra',
            'transmission': 'Manual'},
            {'url': 'abfss://[email protected]/projects/cars/make=Ford/model=Fiesta/transmission=Automatic',
            'make': 'Ford',
            'model': 'Fiesta',
            'transmission': 'Automatic'}]
    
        ...and from that list a list of dataframes per url and associated partition steps, such as:
            [df1, df2, ..., dfn]
        """
    
        def load_dataframe(url:str=None, partition_steps:dict={}, file_format:str=None, df:pyspark.sql.dataframe.DataFrame=None) -> pyspark.sql.dataframe.DataFrame:
            """
            Recursively load a dataframe and apply the partition steps via withColumn
            """
            if file_format is None or len(file_format) == 0:
                raise(ValueError('file_format must not be none, the URL must end in the file format (.parquet, .csv, etc)'))
    
            # if there is a url and non empty partition steps without a df loaded then load the dataframe
            if (url is not None and len(partition_steps.keys()) > 0 and df is None):
                df = spark.read.format(file_format).load(url)
    
                # df is loaded so do not pass a url indicating it is loaded
                return load_dataframe(url=None, partition_steps=partition_steps, df=df)
    
            # if here then the df is loaded and proceed to apply withColumn
            if (url is None and df is not None and len(partition_steps.keys()) > 0):
    
                # load the first item in the partition steps dict
                key = list(partition_steps.keys())[0]
                value = list(partition_steps.values())[0]
    
                # remove the first item from the partition steps dict
                partition_steps.pop(key)
    
                # load the dataframe with the new partition step
                df = df.withColumn(key, F.lit(value))
                return load_dataframe(url=None, partition_steps=partition_steps, df=df)
    
            # if it makes it here then the dataframe is loaded and the partition steps are applied
            return df
    
        # list of dataframe dict values of url and partition steps
        list_df_dicts = list()
    
        if not isinstance(dir_urls, list):
            raise TypeError('dir_urls must be a list of string values')
    
        # iterate over all urls and generate dict of partition values
        for url in dir_urls:
    
            # dict to store url and partition steps
            d_dict = dict()
            d_dict['url'] = url
    
            # get the format from the last part of the url
            file_format = url.split('.')[-1]
            d_dict['file_format'] = file_format
    
            # split the url keeping only partition steps (ex. make=Vauxhall)
            url_split = [u for u in d_dict['url'].split('/') if '=' in u]
    
            if len(url_split) == 0:
                raise ValueError('The list of URLs must contain the partition steps, ex. make=ford')
    
            # turn the partition=item into a key:value
            partition_items = [u.split('=') for u in url_split]
    
            # iterate over every item in partition_items=[['key', 'value']] and set dict[key] = value
            for item in partition_items:
                key = item[0]
                value = item[1]
                d_dict[key] = value
    
            list_df_dicts.append(d_dict)
    
        # iterate over all the dicts and load the dataframes to a list with their partition steps in place
        list_dfs = list()
        for d_dict in list_df_dicts:
    
            # get the url from the d_dict
            url = d_dict['url']
    
            # get the format
            file_format = d_dict['file_format']
    
            # remove the url from the d_dict
            d_dict.pop('url')
            df = load_dataframe(url=url, partition_steps=d_dict, file_format=file_format)
            list_dfs.append(df)
    
        # return the list of dataframes
        return list_dfs