Search code examples
javaapache-sparkspark3

Does Spark 3.0.1 support custom Aggregators on window functions?


I wrote a custom Aggregator (an extension of org.apache.spark.sql.expressions.Aggregator) and Spark invokes it correctly as an aggregating function under group by statement:

sparkSession
    .createDataFrame(...)
    .groupBy(col("id"))
    .agg(
        new MyCustomAggregator().toColumn().name("aggregation_result"))
    .show();

I would like to use it within window function though, because ordering matters to me. I've tried invoking it like that:

sparkSession
    .createDataFrame(...)
    .withColumn("aggregation_result", new MyCustomAggregator().toColumn().over(Window
        .partitionBy(col("id"))
        .orderBy(col("order"))))
    .show();

That's the error I get:

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `id` ORDER BY `order` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.

Is it at all possible to use custom Aggregators as window functions in Spark 3.0.1? If so, what am I missing here?


Solution

  • Yes, Spark 3 does indeed support custom aggregators as window functions.

    Here is the Java code:

    UserDefinedFunction myCustomAggregation = functions.udaf(new MyCustomAggregator(), Encoders.bean(AggregationInput.class));
    
    sparkSession
        .createDataFrame(...)
        .withColumn("aggregation_result", myCustomAggregation.apply(col("aggregation_input1"), col("aggregation_input2")).over(Window
            .partitionBy(col("id"))
            .orderBy(col("order"))))
        .show();
    

    AggregationInput here is a simple DTO with the row elements needed for your aggregation function.

    So no matter whether you aggregate under group by or as a window function you still want to use org.apache.spark.sql.expressions.Aggregator.