I have to read parquet files that are stored in the following folder structure /yyyy/mm/dd/ (eg: 2021/01/31)
If I read the files like this, it works:
unPartitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/*/*/*/*.parquet")
Unfortunately, the folder structure is not stored in the typical partitioned format /yyyy=2021/mm=01/dd=31/ and I don't have the luxury of converting it to that format.
I was wondering if there is a way I can provide Spark a hint as to the folder structure so that it would make "2021/01/31" available as yyyy, mm, dd in my dataframe.
I have another set of files, which are stored in the /yyyy=aaaa/mm=bb/dd=cc format and the following code works:
partitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/")
Things I have tried
I have specified the schema, but it just returned nulls
customSchema = StructType([
StructField("yyyy",LongType(),True),
StructField("mm",LongType(),True),
StructField("dd",LongType(),True),
StructField("id",LongType(),True),
StructField("a",LongType(),True),
StructField("b",LongType(),True),
StructField("c",TimestampType(),True)])
partitionDF = spark.read.option("mergeSchema", "true").schema(customSchema).parquet("abfss://[email protected]/Address/")
display(partitionDF)
the above returns no data!. If I change the path to: "abfss://[email protected]/Address////.parquet", then I get data, but yyyy,mm,dd columns are empty.
Another option would be to load the folder path as a column, but I cant seem to find a way to do that.
TIA
Databricks N00B!
I suggest you load the data without the partitioned folders as you mentioned
unPartitionedDF = spark.read.option("mergeSchema", "true").parquet("abfss://[email protected]/Address/*/*/*/*.parquet")
Then add a column with the input_file_name
function value in:
import pyspark.sql.functions as F
unPartitionedDF = unPartitionedDF.withColumn('file_path', F.input_file_name())
Then you could split the values of the new file_path
column into three separate columns.
df = unPartitionedDF.withColumn('year', F.split(df['file_path'], '/').getItem(3)) \
.withColumn('month', F.split(df['file_path'], '/').getItem(4)) \
.withColumn('day', F.split(df['file_path'], '/').getItem(5))
The input value of getItem
function is based on the exact folder structure you have.
I hope it could resolve your proble.