Search code examples
javastreamingapache-flinkbackpressure

Flink co group outer join fails with High Backpressure


I have two streams in Flink stream1 has 70000 records per sec and stream2 may or may not have data.

// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
    environment
        .addSource(createHFAConsumer())
        .name("hfa source");

SingleOutputStreamOperator<EVWindow> stream2 = environment
        .addSource(createHFDConsumer())
        .name("hfd source");
    
DataStream<Message> pStream =
        stream1
        .coGroup(stream2)
        .where(obj -> obj.getid())
        .equalTo(ev -> ev.getid())
            .window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
            .evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
        .apply(new CalculateCoGroupFunction());

This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.

How do I handle outer join in such scenario


Solution

  • Thanks David Anderson for the pointers

    RCA :

    The main issue came when I tried to create a Tumbling Window around my Stream.

    As per Flink Documentation

    In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness

    Since there was no incoming data for stream2 the window never materialized. As David pointed out

    Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks

    which means flink was buffering data from stream1 while waiting for stream2 and would eventually result in High Backpressure and finally a OOM.

    The Solution :

    I created a external script to send dummy heartbeat messages to the Kafka Stream stream2 at the desired interval and added logic in my application to ignore these messages for computation.

    This forced the stream2 and stream1 to advance the watermarks and the window was removed out of context.