Search code examples
javaapache-sparkapache-spark-dataset

Is there a way to modify each grouped dataset as a whole in Spark?


I have this Dataset and I'd like a more flexible way of grouping and editing the grouped data. As an example I wanted to remove the second Random_Text from every group of Names in this Dataset and concatenate the rest of the text:

Random Dataset as an example

+-------+-----------+
|  Names|Random_Text|
+-------+-----------+
|Michael|      Hello|
|    Jim|       Good|
|    Bob|        How|
|Michael|       Good|
|Michael|    Morning|
|    Bob|        Are|
|    Bob|        You|
|    Bob|      Doing|
|    Jim|        Bye|
+-------+-----------+

I would want to have the Dataset look like this:

+-------+-------------+
|  Names|  Random_Text|
+-------+-------------+
|Michael|Hello Morning|
|    Jim|         Good|
|    Bob|How You Doing|
+-------+-------------+

I think I need to define some kind of custom userdefinedaggregatefunction but I can't think what that might look like in Java. I looked through the documentation but I couldn't find anything too concrete that made sense in Java: https://spark.apache.org/docs/3.0.2/api/java/org/apache/spark/sql/functions.html https://docs.databricks.com/udf/aggregate-scala.html

Dataset<Row> random_text = dtf.groupBy(col("Names")).apply(???)
Dataset<Row> random_text = dtf.groupBy(col("Names")).agg(???)

Solution

  • You can use Window function row_number to identify 2nd Random_Text from each group and then filter that.

    Required imports:

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.expressions.*;
    import static org.apache.spark.sql.functions.*;
    

    Code:

    Dataset<Row> df = // input;
    
    df.withColumn("rn",
                    row_number().over(Window.partitionBy("Names").orderBy("Names")))
                    .where("rn <> 2")
                    .groupBy("Names")
                    .agg(concat_ws(" ", collect_list("Random_Text")).as("Random_Text"))
                    .show();
    
    +-------+-------------+
    |  Names|  Random_Text|
    +-------+-------------+
    |    Jim|         Good|
    |Michael|Hello Morning|
    |    Bob|How You Doing|
    +-------+-------------+