I have two streams in Flink stream1
has 70000 records per sec and stream2
may or may not have data.
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
This works perfectly fine when both Streams have data , but when stream2 has no data the job fails with very high back pressure. The CPU utilization also spikes by 200%.
How do I handle outer join in such scenario
Thanks David Anderson for the pointers
RCA :
The main issue came when I tried to create a Tumbling Window around my Stream.
As per Flink Documentation
In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness
Since there was no incoming data for stream2
the window never materialized. As David pointed out
Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks
which means flink was buffering data from stream1
while waiting for stream2
and would eventually result in High Backpressure and finally a OOM.
The Solution :
I created a external script to send dummy heartbeat messages to the Kafka Stream stream2
at the desired interval and added logic in my application to ignore these messages for computation.
This forced the stream2
and stream1
to advance the watermarks and the window was removed out of context.