We are trying to order events by event time that we consume from three Kafka topics. Each source topic has three partitions and we set the Flink parallelism also to three. After reading the events from Kafka, we map each event into a generic format and union them. Lastly, we key they unioned stream by a shared eventID
and try to order them using a ProcessFunction as explained here.
We assign the event time to the events like the following:
WatermarkStrategy<String> watermarkStrategy =
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
...
}
});
KafkaSource<String> topic = KafkaSource.<String>builder()
.setBootstrapServers("...")
.setTopics("my_topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> sourceStream = env.fromSource(topic, watermarkStrategy, "Kafka Source");
The (simplified) process function looks like this:
public class OrderProcessFunction extends KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>> {
private transient MapState<Long, List<Tuple2<LocalDateTime, String>>> queueState = null;
@Override
public void open(Configuration config) {
TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() {});
TypeInformation<List<Tuple2<LocalDateTime, String>>> value = TypeInformation.of(new TypeHint<List<Tuple2<LocalDateTime, String>>>() {});
queueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("events-by-timestamp", key, value));
}
@Override
public void processElement(Tuple2<LocalDateTime, String> event,
KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.Context ctx,
Collector<Tuple2<LocalDateTime, String>> out) throws Exception {
TimerService timerService = ctx.timerService();
if (ctx.timestamp() > timerService.currentWatermark()) {
List<Tuple2<LocalDateTime, String>> listEvents = queueState.get(ctx.timestamp());
if (isEmpty(listEvents)) {
listEvents = new ArrayList<>();
}
listEvents.add(event);
queueState.put(ctx.timestamp(), listEvents);
timerService.registerEventTimeTimer(ctx.timestamp());
} else {
// Event considered late, write to side output to debug
ctx.output(sideOutputLateEventsProcessFunction, event);
}
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.OnTimerContext ctx,
Collector<Tuple2<LocalDateTime, String>> out) throws Exception {
queueState.get(timestamp).forEach(out::collect);
queueState.remove(timestamp);
}
}
The topics already have historic events from the last half year, which we also want to order. One topic in particular has way more than the other two, which to our understanding would cause the process function to buffer all events until watermarks have arrived from each Flink Kafka Source.
However, when we start the Flink job, only a few events are correctly ordered, but most of them (for all streams) are quickly considered late in the ProcessFunction and therefore dropped.
We have the following questions:
SerializableTimestampAssigner
, because it currently seems it doesn't.Thank you!
We realised that we are facing this bug within Flink that was only fixed in later versions.
We configured idleness for all Flink Kafka consumers, but the bug incorrectly marked the stream consumers as idle. This led to the many late events in the process function.