Search code examples
apache-kafkaapache-flinkflink-streaming

Migrating from FlinkKafkaConsumer to KafkaSource, no windows executed


I have implemented FlinkKafkaConsumer to consume messages from a kafka-topic. The only custom setting other than "group" and "topic" is (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") to enable re-reading the same messages several times. It works out of the box for consuming and logic.

Now FlinkKafkaConsumer is deprecated, and I wanted to update to its successor KafkaSource.

Initializing KafkaSource with the same parameters as I do FlinkKafkaConsumer produces a read of the topic as expected, i can verify this by printing the stream. De-serialization and timestamps seem to work fine. However execution of windows is not done, and as such no results are produced.

I assume some default setting(s) in KafkaSource are different from that of FlinkKafkaConsumer, but I have no idea what they might be.

KafkaSource - Not working

KafkaSource<TestData> source = KafkaSource.<TestData>builder()
     .setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
     .setTopics(TOPIC)
     .setDeserializer(new CustomDeserializer())
     .setGroupId(GROUP_ID)
     .setStartingOffsets(OffsetsInitializer.earliest())
     .build();

DataStream<TestData> testDataStreamSource = env.fromSource(
     source,
     WatermarkStrategy. <
     TestData > noWatermarks(),
     "Kafka Source"
 );

Kafka consumer - Working

(Properties contains group.id,bootstrap.servers and zookeeper.connect)

propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)

Both streams use the same pipeline that looks like this

testDataStreamSource
    .assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - > event.getTimestamp()))
    .keyBy(TestData::getKey)
    .window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
    .process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
            @Override
            public void process(
                ....
            });

Things tried

  • I've tried to experiment with setting committing of offsets, but it has not improved the situation.
  • Setting timestamps already in the source.

Solution

  • Update: The answer is that the KafkaSource behaves differently than FlinkKafkaConsumer in the case where the number of Kafka partitions is smaller than the parallelism of Flink's kafka source operator. See https://stackoverflow.com/a/70101290/2000823 for details.

    Original answer:

    The problem is almost certainly something related to the timestamps and watermarks.

    To verify that timestamps and watermarks are the problem, you could do a quick experiment where you replace the 3-hour-long event time sliding windows with short processing time tumbling windows.

    In general it is preferred (but not required) to have the KafkaSource do the watermarking. Using forMonotonousTimestamps in a watermark generator applied after the source, as you are doing now, is a risky move. This will only work correctly if the timestamps in all of the partitions being consumed by each parallel instance of the source are processed in order. If more than one Kafka partition is assigned to any of the KafkaSource tasks, this isn't going to happen. On the other hand, if you supply the forMonotonousTimestamps watermarking strategy in the fromSource call (rather than noWatermarks), then all that will be required is that the timestamps be in order on a per-partition basis, which I imagine is the case.

    As troubling as that is, it's probably not enough to explain why the windows don't produce any results. Another possible root cause is that the test data set doesn't include any events with timestamps after the first window, so that window never closes.

    Do you have a sink? If not, that would explain things.

    You can use the Flink dashboard to help debug this. Look to see if the watermarks are advancing in the window tasks. Turn on checkpointing, and then look to see how much state the window task has -- it should have some non-zero amount of state.