Search code examples
apache-sparkapache-spark-sqlaggregate-functionsuser-defined-functions

Can you reuse one of the buffers in the merge method of Spark Aggregators?


The merge method of the Apache Spark Aggregator class takes two buffers and combines them into one. Can I reuse one of the buffers (possibly modifying it) instead of creating a new one to be returned?

I noticed this in the documentation of the reduce method:

For performance, the function may modify b and return it instead of constructing new object for b.

but there's no similar message on merge. I also found this example in the Spark code which returns one of the buffers instead of creating a new one, so I'm assuming that (at least) this is possible.


Solution

  • merge is called by ComplexTypeAggregateExpression which implements TypedImperativeAggregate or by udaf.ScalaAggregator which is wrapped in a TypedImperativeAggregate when calling toColumn.

    The scaladoc for that merge shows:

    Merges an input aggregation object into aggregation buffer object and returns a new buffer object. For performance, the function may do in-place merge and return it instead of constructing new buffer object.
    This is typically called when doing PartialMerge or Final mode aggregation.
    Params:
    buffer – the aggregation buffer object used to store the aggregation result.
    input – an input aggregation object. Input aggregation object can be produced by de-serializing the partial aggregate's output from Mapper side.
    

    i.e. the 1st param is the current partition's buffer.

    The actual caller of that merge function is also in TypedImperativeAggregate:

    final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
      val bufferObject = getBufferObject(buffer)
      // The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
      val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
      buffer(mutableAggBufferOffset) = merge(bufferObject, inputObject)
    }
    

    i.e. the buffer you return from merge will be re-used as this partitions buffer.

    You are able to re-use either the left or right objects as you see fit.