Search code examples
scalaapache-spark-sqlparquet

Get the subfolder as a column while reading multiple parquet files with SparkSQL


I would like to add a column to each DataFrame loaded from parquet files with SparkSQL, to add a substring of the path to the file, and then make it a single DataFrame.

For example, when loading .../subfolder1/my_parquet_file1.parquet and .../subfolder2/my_parquet_file2.parquet I want to have the following final DataFrame :

col1 | col2 | subfolder
------------------------
aaa  | bbb  | subfolder1
ccc  | ddd  | subfolder1
eee  | fff  | subfolder2
ggg  | hhh  | subfolder2

The following code allows to load all the files in a list of paths :

sqlContext.read.schema(schema).parquet(paths: _*)

But by having directly the final DataFrame I can't add the subfolder depending on where each row came from.

Is there a way to do this without sequentially loading each file?


Solution

  • try this -

    val df = spark.read
          .parquet(
            getClass.getResource("/parquet/day/day1/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
              .getPath,
            getClass.getResource("/parquet/day/day2/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy.parquet")
              .getPath
          )
        df.show(false)
        df.printSchema()
    
        /**
          * +------+
          * |price |
          * +------+
          * |123.15|
          * |123.15|
          * +------+
          *
          * root
          * |-- price: decimal(5,2) (nullable = true)
          */
    
        df.withColumn("subfolder", element_at(split(input_file_name(), "/"), -2))
          .show(false)
    
        /**
          * +------+---------+
          * |price |subfolder|
          * +------+---------+
          * |123.15|day1     |
          * |123.15|day2     |
          * +------+---------+
          */