Search code examples
scalaapache-sparkcollectlis

Spark collect_list change data_type from array to string


I am having a following aggregation

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))

Here I am doing some aggregation and collecting the result with collect_list. Earlier we were using spark 1 and it was giving me below data types.

 |-- final_data1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_data2: array (nullable = true)
 |    |-- element: string (containsNull = true)

Now we have to migrate to spark 2 but we are getting below schema.

|-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

On getting first() record below is the difference

spark 1.6

[2020-09-26, Ayush, 103.67] => datatype string

spark 2 

WrappedArray(2020-09-26, Ayush, 103.67)

How can I keep the same data type?

Edit - Tried Using Concat

One way I got exact schema like Spark 1.6 is by using concat like this

val df_date_agg = df
    .groupBy($"msisdn",$"event_date",$"network")
    .agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
    .groupBy($"msisdn")
    .agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))

Will it affect my code performance?? Is there a better way to do this?


Solution

  • Since you want a string representation of an array, how about casting the array into a string like this?

    val df_date_agg = df
        .groupBy($"a",$"b",$"c")
        .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
        .groupBy($"a")
        .agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
             collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))
    

    It might simply be what your old version of spark was doing.

    The solution you propose would probably work as well by the way but wrapping your column references with lit is not necessary (lit($"event_date")). $"event_date" is enough.