Search code examples
scalaapache-sparkapache-spark-dataset

How to name aggregate columns?


I'm using Spark in Scala and my aggregated columns are anonymous. Is there a convenient way to rename multiple columns from a dataset? I thought about imposing a schema with as but the key column is a struct (due to the groupBy operation), and I can't find out how to define a case class with a StructType in it.

I tried defining a schema as follows:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),
                                                             StructField("dst", IntegerType), true)), 
                              StructField("count", LongType, true))
edge_count.as[returnSchema]

but I got a compile error:

Message: <console>:74: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean)
       val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true),

Solution

  • I ended up using aliases with the select statement; e.g.,

    ds.select($"key.src".as[Short], 
              $"key.dst".as[Short], 
              $"sum(count)".alias("count").as[Long])
    

    First I had to use printSchema to determine the derived column names:

    > ds.printSchema
    
    root
     |-- key: struct (nullable = false)
     |    |-- src: short (nullable = false)
     |    |-- dst: short (nullable = false)
     |-- sum(count): long (nullable = true)