I would like to use .collect_list()
without having to type/paste every single column title as the input, because my data has too many column titles to simply type out or paste. How do I input multiple column titles as the input to this function? My data is enormous: 55 billion rows, 2300 columns, thus I would appreciate big-data appropriate solutions if possible.
Example:
import org.apache.spark.sql.functions._
val Enormous_df = Seq(("Apples", "a", "b"),
("Apples", "a", "d"),
("Banana", "e", "f"))
.toDF("Grouping_by_this_Column", "One_of_Thousands_of_Columns", "Second_of_Thousands_of_Columns")
Enormous_df.show()
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
| Apples| a| b|
| Apples| a| d|
| Banana| e| f|
+-----------------------+---------------------------+------------------------------+
Continuing with example:
//Creating a sequence of the column titles I desire. In this case, it's all of them.
val All_Thousands_of_Column_Titles = Seq(Enormous_df.drop("Grouping_by_this_Column").columns)
//This doesn't work. How do I do this?
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))
The error I get is:
command-33353371:11: error: overloaded method value collect_list with alternatives:
(columnName: String)org.apache.spark.sql.Column <and>
(e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
cannot be applied to (Seq[Array[String]])
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(collect_list(All_Thousands_of_Column_Titles))
The df I am trying to make would be:
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
| Apples| [a, a]| [b, e]|
| Banana| [d]| [f]|
+-----------------------+---------------------------+------------------------------+
In pyspark, the successful equivalent of what I am trying to do is:
import pyspark.pandas as ps
Enormous_df=ps.DataFrame({"Grouping_by_this_Column":["Apples","Apples","Banana"],
"One_of_Thousands_of_Columns":["a","a","d"],
"Second_of_Thousands_of_Columns":["b","e","f"]})\
.to_spark()
The_df_I_Want= Enormous_df.groupby("Grouping_by_this_Column").agg({Column_Title:"collect_list" \
for Column_Title in Enormous_df.drop("Grouping_by_this_Column")\
.columns})\
.withColumnRenamed("collect_list(One_of_Thousands_of_Columns)","One_of_Thousands_of_Columns")\
.withColumnRenamed("collect_list(Second_of_Thousands_of_Columns)","Second_of_Thousands_of_Columns")
The_df_I_Want.show()
If I understand you correctly, Gaël's answer is right, but needs a slight modification.
Before that modification can take place, though, you should not use Seq(df.columns)
because df.columns
already gives you a Seq
. With that in mind, you can write
val All_Thousands_of_Column_Titles = Enormous_df.drop("Grouping_by_this_Column").columns
val aggregations = All_Thousands_of_Column_Titles.map(col => collect_list(col).as(col))
val The_df_I_Want = Enormous_df.groupBy("Grouping_by_this_Column").agg(aggregations.head, aggregations.tail: _*)
The_df_I_Want.show()
And you will get
+-----------------------+---------------------------+------------------------------+
|Grouping_by_this_Column|One_of_Thousands_of_Columns|Second_of_Thousands_of_Columns|
+-----------------------+---------------------------+------------------------------+
| Apples| [a, a]| [b, d]|
| Banana| [e]| [f]|
+-----------------------+---------------------------+------------------------------+