I would like to add a column to each DataFrame loaded from parquet files with SparkSQL, to add a substring of the path to the file, and then make it a single DataFrame.
For example, when loading .../subfolder1/my_parquet_file1.parquet
and .../subfolder2/my_parquet_file2.parquet
I want to have the following final DataFrame :
col1 | col2 | subfolder
------------------------
aaa | bbb | subfolder1
ccc | ddd | subfolder1
eee | fff | subfolder2
ggg | hhh | subfolder2
The following code allows to load all the files in a list of paths :
sqlContext.read.schema(schema).parquet(paths: _*)
But by having directly the final DataFrame I can't add the subfolder depending on where each row came from.
Is there a way to do this without sequentially loading each file?
try this -
val df = spark.read
.parquet(
getClass.getResource("/parquet/day/day1/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
.getPath,
getClass.getResource("/parquet/day/day2/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
.getPath
)
df.show(false)
df.printSchema()
/**
* +------+
* |price |
* +------+
* |123.15|
* |123.15|
* +------+
*
* root
* |-- price: decimal(5,2) (nullable = true)
*/
df.withColumn("subfolder", element_at(split(input_file_name(), "/"), -2))
.show(false)
/**
* +------+---------+
* |price |subfolder|
* +------+---------+
* |123.15|day1 |
* |123.15|day2 |
* +------+---------+
*/