I have created a cube on two dimensions in spark using scala. The data is coming from two different dataframes. The names are "borrowersTable" and 'loansTable". They have been created with the "createOrReplaceTempView" option so that it is possible to run sql queries on them. The goal was to create the cube on two dimensions (gender and department) summing up the total number of loans for books for a library. With the command
val cube=spark.sql("""
select
borrowersTable.department,borrowersTable.gender,count(loansTable.bibno)
from borrowersTable,loansTable
where borrowersTable.bid=loansTable.bid
group by borrowersTable.gender,borrowersTable.department with cube;
""")
i create the cube which has this result:
Then using the command
cube.write.format("csv").save("file:///....../data/cube")
Spark creates a folder named cube which includes 34 files named part*.csv which include columns for department, gender, and sum of loans (every group by).
The goal here is to create files taking the names of the first two columns (attributes) in this way: for GroupBy (Attr1, Attr2) the file should be named Attr1_Attr2.
e.g. For (Economics, M) the file should be named Economics_M. For (Mathematics, null) it should be Mathematics_null and so on. Any help would be appreciated.
When you call df.write.format("...").save("...")
each Spark executor saves partitions it holds into corresponding part* file. This is the mechanism for storing and loading big files and you can not change it. However you can try the following alternatives whatever works better in you case:
cube
.write
.partitionBy("department", "gender")
.format("csv")
.save("file:///....../data/cube")
This will create subfolders with names like department=Physics/gender=M
still containing part* files inside. This structure can be later loaded back to Spark and used for effective joins by partitioned columns.
val csvRows = cube
.collect()
.foreach {
case Row(department: String, gender: String, _) =>
// just the simple way to write CSV, you can use any CSV lib here as well
Files.write(Paths.get(s"$department_$gender.csv"), s"$department,$gender".getBytes(StandardCharsets.UTF_8))
}
If you call collect()
you receive you data frame on driver side as Array[Row]
and then you can do with it whatever you want. The important limitation of this approach is that you data frame should fit into driver's memory.