Search code examples
apache-flinkflink-streaming

Pre-shuffle aggregation in Flink


We are migrating a spark job to flink. We have used pre-shuffle aggregation in spark. Is there a way to execute similar operation in spark. We are consuming data from apache kafka. We are using keyed tumbling window to aggregate the data. We want to aggregate the data in flink before performing shuffle.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html


Solution

  • yes, it is possible and I will describe three ways. First the already built-in for Flink Table API. The second way you have to build your own pre-aggregate operator. The third is a dynamic pre-aggregate operator which adjusts the number of events to pre-aggregate before the shuffle phase.

    Flink Table API

    As it is shown here you can do MiniBatch Aggregation or Local-Global Aggregation. The second option is better. You basically tell to Flink to create mini-batches of every 5000 events and pre-aggregate them before the shuffle phase.

    // instantiate table environment
    TableEnvironment tEnv = ...
    
    // access flink configuration
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // set low-level key-value options
    configuration.setString("table.exec.mini-batch.enabled", "true");
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    configuration.setString("table.exec.mini-batch.size", "5000");
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
    

    Flink Stream API

    This way is more cumbersome because you have to create your own operator using OneInputStreamOperator and call it using the doTransform(). Here is the example of the BundleOperator.

    public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
      extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
      implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
    @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
      // get the key and value for the map bundle
      final IN input = element.getValue();
      final K bundleKey = getKey(input);
      final V bundleValue = this.bundle.get(bundleKey);
    
      // get a new value after adding this element to bundle
      final V newBundleValue = userFunction.addInput(bundleValue, input);
    
      // update to map bundle
      bundle.put(bundleKey, newBundleValue);
    
      numOfElements++;
      bundleTrigger.onElement(input);
     }
    
     @Override
     public void finishBundle() throws Exception {
      if (!bundle.isEmpty()) {
       numOfElements = 0;
       userFunction.finishBundle(bundle, collector);
       bundle.clear();
      }
      bundleTrigger.reset();
     }
    }
    

    The call-back interface defines when you are going to trigger the pre-aggregate. Every time that the stream reaches the bundle limit at if (count >= maxCount) your pre-aggregate operator will emit events to the shuffle phase.

    public class CountBundleTrigger<T> implements BundleTrigger<T> {
     private final long maxCount;
     private transient BundleTriggerCallback callback;
     private transient long count = 0;
    
     public CountBundleTrigger(long maxCount) {
      Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
      this.maxCount = maxCount;
     }
    
     @Override
     public void registerCallback(BundleTriggerCallback callback) {
      this.callback = Preconditions.checkNotNull(callback, "callback is null");
     }
    
     @Override
     public void onElement(T element) throws Exception {
      count++;
      if (count >= maxCount) {
       callback.finishBundle();
       reset();
      }
     }
    
     @Override
     public void reset() {
      count = 0;
     }
    }
    

    Then you call your operator using the doTransform:

    myStream.map(....)
     .doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
     .map(...)
     .keyBy(...)
     .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
    

    A dynamic pre-aggregation

    In case you wish to have a dynamic pre-aggregate operator check the AdCom - Adaptive Combiner for stream aggregation. It basically adjusts the pre-aggregation based on backpressure signals. It results in using the maximum possible of the shuffle phase.