Search code examples
eventsapache-kafkaapache-flinkflink-streaming

Order events by event time from multiple Kafka topics


We are trying to order events by event time that we consume from three Kafka topics. Each source topic has three partitions and we set the Flink parallelism also to three. After reading the events from Kafka, we map each event into a generic format and union them. Lastly, we key they unioned stream by a shared eventID and try to order them using a ProcessFunction as explained here.

Flink High Level Jobgraph

We assign the event time to the events like the following:

WatermarkStrategy<String> watermarkStrategy = 
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMinutes(10))
    .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
        @Override
        public long extractTimestamp(String element, long recordTimestamp) {
            ...
        }
    });

KafkaSource<String> topic = KafkaSource.<String>builder()
    .setBootstrapServers("...")
    .setTopics("my_topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStreamSource<String> sourceStream = env.fromSource(topic, watermarkStrategy, "Kafka Source");

The (simplified) process function looks like this:


public class OrderProcessFunction extends KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>> {

    private transient MapState<Long, List<Tuple2<LocalDateTime, String>>> queueState = null;

    @Override
    public void open(Configuration config) {
        TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() {});
        TypeInformation<List<Tuple2<LocalDateTime, String>>> value = TypeInformation.of(new TypeHint<List<Tuple2<LocalDateTime, String>>>() {});
        queueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("events-by-timestamp", key, value));
    }

    @Override
    public void processElement(Tuple2<LocalDateTime, String> event,
                               KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.Context ctx,
                               Collector<Tuple2<LocalDateTime, String>> out) throws Exception {

        TimerService timerService = ctx.timerService();
        if (ctx.timestamp() > timerService.currentWatermark()) {

            List<Tuple2<LocalDateTime, String>> listEvents = queueState.get(ctx.timestamp());
            if (isEmpty(listEvents)) {
                listEvents = new ArrayList<>();
            }

            listEvents.add(event);
            queueState.put(ctx.timestamp(), listEvents);
            timerService.registerEventTimeTimer(ctx.timestamp());
        } else {
            // Event considered late, write to side output to debug
            ctx.output(sideOutputLateEventsProcessFunction, event);
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Tuple2<LocalDateTime, String>, Tuple2<LocalDateTime, String>>.OnTimerContext ctx,
                        Collector<Tuple2<LocalDateTime, String>> out) throws Exception {
        queueState.get(timestamp).forEach(out::collect);
        queueState.remove(timestamp);
    }
}

The topics already have historic events from the last half year, which we also want to order. One topic in particular has way more than the other two, which to our understanding would cause the process function to buffer all events until watermarks have arrived from each Flink Kafka Source.

However, when we start the Flink job, only a few events are correctly ordered, but most of them (for all streams) are quickly considered late in the ProcessFunction and therefore dropped.

We have the following questions:

  • We understand from the Flink Kafka Source documentation that the watermarks are computed across alls partitions. That would mean that Flink would first need to process at least one event per partition after which a watermark for that partition is generated. The Flink Kafka Source would the emit a watermark that is the minimum across all partition watermarks. Is that correct? Does this also work with a SerializableTimestampAssigner, because it currently seems it doesn't.
  • What's the best way to debug the event time / watermarking mechanism of the Flink Kafka Source to understand why the processing function drops most of the events? We are currently using side outputs for the late data and the Flink WebUI for the watermarks.

Thank you!


Solution

  • We realised that we are facing this bug within Flink that was only fixed in later versions.

    We configured idleness for all Flink Kafka consumers, but the bug incorrectly marked the stream consumers as idle. This led to the many late events in the process function.