Search code examples
apache-spark

Interpolating target path in Spark


I'm using Spark 3.5 to process some requests, which come bucketed into month and date:

/09/27/json-lines-file-with-a-long-name

I'm doing some filtration and want the results to end up bucketed in the very same way (processed requests for Sept 27 go into 09/27 directory). I have created hour and day columns, so the question is whether it's technically possible to recreate the same directory bucketing by interpolating path in some way? Here's an example of how it may looked like:

// obviously this doesn't work
df
  .write
  .options(...)
  .json("$month/$day")

I'm aware that this will likely bring performance penalty and require special partitioning (that will create uneven partitions in my case). I've seen some examples with partitionBy, which are almost what i'm looking for, but i believe that functionality is not available or was renamed in Spark 3. I also understand that i can do it in a simple for-loop, but curious about existence of a "right" solution.


Solution

  • As of spark 3.5, partitionBy still exists and it is almost perfect for what you want to do (See the doc of DataFrameWriter).

    It will write your files as follows:

    /month=09/day=27/json-lines-file-with-a-long-name
    

    I don't know of any way to get rid of the column_name= part simply by configuring partitionBy. A solution would be to rename the files afterwards. This solution avoids reading each part of the dataframe separately which would most likely be much slower.

    // An example
    spark.range(5)
         .select('id % 2 as "m", 'id % 4 as "m2",  'id)
         .write.mode("overwrite")
         .partitionBy("m", "m2")
         .json("test2")
    
    > tree test2
    tree test2
    test2
    ├── m=0
    │   ├── m2=0
    │   │   ├── part-00003-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   │   └── part-00015-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   └── m2=2
    │       └── part-00009-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    ├── m=1
    │   ├── m2=1
    │   │   └── part-00006-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   └── m2=3
    │       └── part-00012-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    └── _SUCCESS
    

    Then, if the names are not good enough, you can rename the files. Here is an example for the local file system but a similar approach would work as well for S3 or HDFS.

    import java.nio.file.{Files, Path, Paths}
    
    def rename_spark_partitions(path : Path) {
        if (Files.isDirectory(path))
            Files.list(path).forEach(rename_spark_partitions _)
        val name = path.getFileName.toString
        if(name.contains("="))
            Files.move(path, path.resolveSibling( name.split("=")(1))) 
    }
    rename_spark_partitions(Paths.get("test2"))
    
    > tree test2
    test2
    ├── 0
    │   ├── 0
    │   │   ├── part-00003-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   │   └── part-00015-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   └── 2
    │       └── part-00009-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    ├── 1
    │   ├── 1
    │   │   └── part-00006-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    │   └── 3
    │       └── part-00012-8d4921cb-1280-416a-8a0c-cc133b15c42d.c000.json
    └── _SUCCESS