Search code examples
pysparkdatabricksazure-databricks

How to specify schema for the folder structure when reading parquet file into a dataframe


I have to read parquet files that are stored in the following folder structure /yyyy/mm/dd/ (eg: 2021/01/31)

If I read the files like this, it works:

unPartitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/*/*/*/*.parquet")

Unfortunately, the folder structure is not stored in the typical partitioned format /yyyy=2021/mm=01/dd=31/ and I don't have the luxury of converting it to that format.

I was wondering if there is a way I can provide Spark a hint as to the folder structure so that it would make "2021/01/31" available as yyyy, mm, dd in my dataframe.

I have another set of files, which are stored in the /yyyy=aaaa/mm=bb/dd=cc format and the following code works:

partitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/")

Things I have tried

I have specified the schema, but it just returned nulls

customSchema = StructType([
  StructField("yyyy",LongType(),True),
  StructField("mm",LongType(),True),
  StructField("dd",LongType(),True),
  StructField("id",LongType(),True),
  StructField("a",LongType(),True),
  StructField("b",LongType(),True),
  StructField("c",TimestampType(),True)])

partitionDF = spark.read.option("mergeSchema", "true").schema(customSchema).parquet("abfss://[email protected]/Address/")
display(partitionDF)

the above returns no data!. If I change the path to: "abfss://[email protected]/Address////.parquet", then I get data, but yyyy,mm,dd columns are empty.

Another option would be to load the folder path as a column, but I cant seem to find a way to do that.

TIA

Databricks N00B!


Solution

  • I suggest you load the data without the partitioned folders as you mentioned

    unPartitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/*/*/*/*.parquet")
    

    Then add a column with the input_file_name function value in:

    import pyspark.sql.functions as F
    unPartitionedDF = unPartitionedDF.withColumn('file_path', F.input_file_name())
    

    Then you could split the values of the new file_path column into three separate columns.

    df = unPartitionedDF.withColumn('year',  F.split(df['file_path'], '/').getItem(3)) \
                        .withColumn('month', F.split(df['file_path'], '/').getItem(4)) \
                        .withColumn('day',   F.split(df['file_path'], '/').getItem(5))
    

    The input value of getItem function is based on the exact folder structure you have.

    I hope it could resolve your proble.