Search code examples
javaapache-flink

Flink (1.3.2) Broadcast record to every operator exactly once


I have an Executiongraph much like this:

{"nodes":[{"id":1,"type":"Source: AggregatedData","pact":"Data Source","contents":"Source: AggregatedData","parallelism":1},{"id":2,"type":"AddVirtualKeyFunction","pact":"Operator","contents":"AddVirtualKeyFunction","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Source: FilterInformation","pact":"Data Source","contents":"Source: FilterInformation","parallelism":1},{"id":4,"type":"BroadcastFilterInformation","pact":"Operator","contents":"BroadcastFilterInformation","parallelism":1,"predecessors":[{"id":3,"ship_strategy":"FORWARD","side":"second"}]},{"id":7,"type":"ConnectAndApplyFilterFunction","pact":"Operator","contents":"ConnectAndApplyFilterFunction","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"},{"id":4,"ship_strategy":"HASH","side":"second"}]},{"id":8,"type":"Sink: OutputFilteredData","pact":"Data Sink","contents":"Sink: OutputFilteredData","parallelism":4,"predecessors":[{"id":7,"ship_strategy":"FORWARD","side":"second"}]}]}

(can be visualized here: https://flink.apache.org/visualizer/)

I have a stream of aggregated data ("AggregatedData", ID = 1) which needs to be filtered by some filter coming from another stream ("FilterInformation", ID = 3).

What I first did was using operator state in my "ConnectAndApplyFilterFunction" (ID = 7) which technically works fine, but is limited to a parallelism of 1.

Currently, I'm doing some hack: In "AddVirtualKeyFunction" I map my aggregated data to a Tuple2<Integer, AggregatedData> where the Integer (f0) is a randomly generated number from 0 to 19:

@Override
public Tuple2<Integer, AggregatedData> map(AggregatedData value) throws Exception {
    return new Tuple2<>(ThreadLocalRandom.current().nextInt(this.virtualKeyCount), value);
}

The "BroadcastFilterInformation" is a flatMap which publishes a Tuple2<Integer, FilterInfo> 20 Times (with f0 0-19) every time it receives a new FilterInformation:

@Override
public void flatMap(FilterInfo filterInfo, Collector<Tuple2<Integer, FilterInfo>> collector) throws Exception {
    if (this.currentLatestTimestamp < filterInfo.getLastUpdateTime()) {
        this.currentLatestTimestamp = filterInfo.getLastUpdateTime();

        for (int i = 0; i < this.broadcastCount; i++) {
            collector.collect(new Tuple2<>(i, filterInfo));
        }
    }
}

I now connect both streams and key them by their "virtual key" (Tuple2.f0). I keep 20 copies of my FilterInfo in a keyed state in "ConnectAndapplyFilterFunction" (ID = 7).

Works fine, I can use parallelism on my main path. But why do I use 20 "virtual keys" while my parallelism is only 4? Because with only 4 keys, not all operators will be used (2 operators were not receiving any data in my test).

Is there any way I can broadcast some data from one stream so that every operator on the other end receives it's own copy?


Solution

  • You can most probably use broadcast option for making the data available to the other instances in an operation.

    In case of batch processing, you can make use of Broadcast variables, which according to the linked website is described as follows, a corresponding example can also be found there:

    Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular input of the operation. This is useful for auxiliary data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.

    In case of stream processing, you can add datastream.broadcast() for broadcasting a stream to the other.

    According to the flink website - the broadcast function - Broadcasts elements (from one stream) to every partition.

    In the stream processing scenario, you need to remind yourself that you need to consider race conditions as data from either stream can come in any order.

    A sample code can be checked out here