How are timestamps treated within an iterative DataStream loop within Flink?
For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
@Override
public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}
@Override
public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
My questions revolve around how does Flink use timestamps within a feedback loop:
ConnectedIterativeStreams
, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects?AFAICT, Flink doesn't provide any guarantees on the ordering of input objects. I've run into this when trying to use iterations for a clustering algorithm in Flink, where the centroid updates don't get processed in a timely manner. The only solution I found was to essentially create a single (unioned) stream of the incoming events and the centroid updates, versus using a co-stream.
FYI there's this proposal to address some of the short-comings of iterations.