Search code examples
scalaapache-sparkdataframeamazon-s3qubole

Divide Spark DataFrame data into separate files


I have the following DataFrame input from a s3 file and need to transform the data into the following desired output. I am using Spark version 1.5.1 with Scala, but could change to Spark with Python. Any suggestions are welcome.

DataFrame Input:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

Desired Output:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$ cat bob/mouse/file.csv
bbbbb
ccccc

terminal$ cat bob/dog/file.csv
ddddd

Here is my existing Spark Scala code that I have tried:

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)

Current Output:

[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]

Some of the problems with my existing code is that the groupBy returns a GroupedData object and I probably don't want to do a count/sum/agg function on that data. I am looking for a better technique to group and output the data. The dataset is very large.


Solution

  • This can be achieved using partitionBy option of the DataFrameWriter. General syntax is as follows:

    df.write.partitionBy("name", "animal").format(...).save(...)
    

    Unfortunately the only plain text format which support partitioning in Spark 1.5 is JSON.

    If you can update Spark installation to:

    • 1.6 - you can use partitionBy with text format. 1.6 is also required if you need a single output file for group (repartition).
    • 2.0 - you can use partitionBy with csv format.

    I believe that in 1.5 your best option is to write files as JSON and convert individual output files.

    If number of distinct name', 'animals is small you can try to perform separate write for each group:

    val dist = df.select("name", "animal").rdd.collect.map {
      case Row(name: String, animal: String) => (name, animal)
    }
    
    for {
      (name, animal) <- dist
    } df.where($"name" === name && $"animal" === animal)
        .select($"data").write.format("csv").save(s"/prefix/$name/$animal")
    

    but this won't scale when number of combinations grows.