Search code examples
scalaapache-spark

How to .collect_list() without typing each column name in scala spark


I would like to use .collect_list() without having to type/paste every single column title as the input, because my data has too many column titles to simply type out or paste. How do I input multiple column titles as the input to this function? My data is enormous: 55 billion rows, 2300 columns, thus I would appreciate big-data appropriate solutions if possible.

Example:

import org.apache.spark.sql.functions._

val Enormous_df = Seq(("Apples", "a", "b"),
                      ("Apples", "a", "d"),
                      ("Banana", "e", "f"))
                      .toDF("Grouping_by_this_Column", "One_of_Thousands_of_Columns", "Second_of_Thousands_of_Columns")

Enormous_df.show()
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
|                 Apples|                          a|                             b|
|                 Apples|                          a|                             d|
|                 Banana|                          e|                             f|
+-----------------------+---------------------------+------------------------------+

Continuing with example:

//Creating a sequence of the column titles I desire. In this case, it's all of them.
val All_Thousands_of_Column_Titles = Seq(Enormous_df.drop("Grouping_by_this_Column").columns)

//This doesn't work. How do I do this?
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))

The error I get is:

command-33353371:11: error: overloaded method value collect_list with alternatives:
  (columnName: String)org.apache.spark.sql.Column <and>
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (Seq[Array[String]])
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))

The df I am trying to make would be:

+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
|                 Apples|                     [a, a]|                        [b, e]|
|                 Banana|                        [d]|                           [f]|
+-----------------------+---------------------------+------------------------------+

In pyspark, the successful equivalent of what I am trying to do is:

import pyspark.pandas as ps
Enormous_df=ps.DataFrame({"Grouping_by_this_Column":["Apples","Apples","Banana"],
                          "One_of_Thousands_of_Columns":["a","a","d"],
                          "Second_of_Thousands_of_Columns":["b","e","f"]})\
              .to_spark()

The_df_I_Want= Enormous_df.groupby("Grouping_by_this_Column").agg({Column_Title:"collect_list" \
                                                                   for Column_Title in Enormous_df.drop("Grouping_by_this_Column")\
                                                                                                  .columns})\
                          .withColumnRenamed("collect_list(One_of_Thousands_of_Columns)","One_of_Thousands_of_Columns")\
                          .withColumnRenamed("collect_list(Second_of_Thousands_of_Columns)","Second_of_Thousands_of_Columns")

The_df_I_Want.show()

Solution

  • If I understand you correctly, Gaël's answer is right, but needs a slight modification.

    Before that modification can take place, though, you should not use Seq(df.columns) because df.columns already gives you a Seq. With that in mind, you can write

    val All_Thousands_of_Column_Titles = Enormous_df.drop("Grouping_by_this_Column").columns
    
    val aggregations = All_Thousands_of_Column_Titles.map(col => collect_list(col).as(col))
    
    val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(aggregations.head, aggregations.tail: _*)
    
    The_df_I_Want.show()
    

    And you will get

    +-----------------------+---------------------------+------------------------------+
    |Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
    +-----------------------+---------------------------+------------------------------+
    |                 Apples|                     [a, a]|                        [b, d]|
    |                 Banana|                        [e]|                           [f]|
    +-----------------------+---------------------------+------------------------------+