Search code examples
scalaapache-sparkexport-to-csv

Is there a way to export csv or other files in spark 3.0.1 using scala with name different than part*?


I have created a cube on two dimensions in spark using scala. The data is coming from two different dataframes. The names are "borrowersTable" and 'loansTable". They have been created with the "createOrReplaceTempView" option so that it is possible to run sql queries on them. The goal was to create the cube on two dimensions (gender and department) summing up the total number of loans for books for a library. With the command

val cube=spark.sql("""
    select 
    borrowersTable.department,borrowersTable.gender,count(loansTable.bibno)
    from borrowersTable,loansTable
    where borrowersTable.bid=loansTable.bid
    group by borrowersTable.gender,borrowersTable.department with cube;
""")

i create the cube which has this result:

cube-image

Then using the command

cube.write.format("csv").save("file:///....../data/cube")

Spark creates a folder named cube which includes 34 files named part*.csv which include columns for department, gender, and sum of loans (every group by).

The goal here is to create files taking the names of the first two columns (attributes) in this way: for GroupBy (Attr1, Attr2) the file should be named Attr1_Attr2.

e.g. For (Economics, M) the file should be named Economics_M. For (Mathematics, null) it should be Mathematics_null and so on. Any help would be appreciated.


Solution

  • When you call df.write.format("...").save("...") each Spark executor saves partitions it holds into corresponding part* file. This is the mechanism for storing and loading big files and you can not change it. However you can try the following alternatives whatever works better in you case:

    1. partitionBy:
    cube
      .write
      .partitionBy("department", "gender")
      .format("csv")
      .save("file:///....../data/cube")
    

    This will create subfolders with names like department=Physics/gender=M still containing part* files inside. This structure can be later loaded back to Spark and used for effective joins by partitioned columns.

    1. collect
    val csvRows = cube
      .collect()
      .foreach {
        case Row(department: String, gender: String, _) => 
          // just the simple way to write CSV, you can use any CSV lib here as well
          Files.write(Paths.get(s"$department_$gender.csv"), s"$department,$gender".getBytes(StandardCharsets.UTF_8))
      }
    

    If you call collect() you receive you data frame on driver side as Array[Row] and then you can do with it whatever you want. The important limitation of this approach is that you data frame should fit into driver's memory.