Search code examples
scalaapache-sparkapache-spark-sqluser-defined-aggregate

When does merge happen in User Defined Aggregating Functions UDAF in Spark


I want to know at which circumstances Spark will perform merge as part of the UDAF function.

Motivation: I am using a lot of UDAF functions OVER a Window in my Spark project. Often I want to answer a question like:

How many times a credit card transaction was made in the same country as the current transaction in the window of 30 days?

The window would start on the current transaction but it will not include it in the count. It needs the value from current transaction to know which country to count in the past 30 days.

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

I wrote my customUDAF to do the counting. I always use .orderBy(orderByColumn.desc) and thanks to .desc the current transaction appears as first in the window during calculation.

UDAF functions require implementation of merge function which merges two intermediate aggregation buffers in parallel computations. If any mergers occur, my current transaction may not be the same for different buffers and the results of UDAF will be incorrect.

I wrote an UDAF function that counts number of mergers on my dataset and keeps only first transaction in window to be compared with current transaction.

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

When I run it on with spark 2.0.1 on a local master with 16 cpu, there are never any mergers and first transaction in the window is always current transaction. This is what I want. In the near future I will run my code on a x100 bigger dataset and on real distributed Spark cluster and want to know if mergers can happen there.

Questions:

  • At which circumstances/conditons mergers take place in UDAF?
  • Do Windows with orderBy ever have mergers?
  • Is it possible to tell Spark not to do mergers?

Solution

  • At which circumstances/conditons mergers take place in UDAF?

    merge is called when partial applications of the aggregate function ("map side aggregation") are merged after the shuffle ("reduce side aggregation").

    Do Windows with orderBy ever have mergers?

    In the current implementation never. As for now window functions are just fancy groupByKey, and there is no partial aggregation. This is of course implementation detail and might changed without further notice in the future.

    Is it possible to tell Spark not to do mergers?

    It is not. However if data is already partitioned by the aggregation key there is no need for merge and only combine is used.

    Finally:

    How many times a credit card transaction was made in the same country as the current transaction in the window of 30 days?

    does not call for UDAFs or window functions. I would probably create tumbling windows with o.a.s.sql.functions.window, aggregate by user, country and window and join back with the input.