The merge method of the Apache Spark Aggregator class takes two buffers and combines them into one. Can I reuse one of the buffers (possibly modifying it) instead of creating a new one to be returned?
I noticed this in the documentation of the reduce method:
For performance, the function may modify b and return it instead of constructing new object for b.
but there's no similar message on merge. I also found this example in the Spark code which returns one of the buffers instead of creating a new one, so I'm assuming that (at least) this is possible.
merge is called by ComplexTypeAggregateExpression which implements TypedImperativeAggregate or by udaf.ScalaAggregator which is wrapped in a TypedImperativeAggregate when calling toColumn.
The scaladoc for that merge shows:
Merges an input aggregation object into aggregation buffer object and returns a new buffer object. For performance, the function may do in-place merge and return it instead of constructing new buffer object.
This is typically called when doing PartialMerge or Final mode aggregation.
Params:
buffer – the aggregation buffer object used to store the aggregation result.
input – an input aggregation object. Input aggregation object can be produced by de-serializing the partial aggregate's output from Mapper side.
i.e. the 1st param is the current partition's buffer.
The actual caller of that merge function is also in TypedImperativeAggregate:
final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
val bufferObject = getBufferObject(buffer)
// The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
buffer(mutableAggBufferOffset) = merge(bufferObject, inputObject)
}
i.e. the buffer you return from merge will be re-used as this partitions buffer.
You are able to re-use either the left or right objects as you see fit.