I have implemented FlinkKafkaConsumer
to consume messages from a kafka-topic. The only custom setting other than "group" and "topic" is (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
to enable re-reading the same messages several times. It works out of the box for consuming and logic.
Now FlinkKafkaConsumer
is deprecated, and I wanted to update to its successor KafkaSource
.
Initializing KafkaSource
with the same parameters as I do FlinkKafkaConsumer
produces a read of the topic as expected, i can verify this by printing the stream. De-serialization and timestamps seem to work fine. However execution of windows is not done, and as such no results are produced.
I assume some default setting(s) in KafkaSource
are different from that of FlinkKafkaConsumer
, but I have no idea what they might be.
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy. <
TestData > noWatermarks(),
"Kafka Source"
);
(Properties contains group.id
,bootstrap.servers
and zookeeper.connect
)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
Both streams use the same pipeline that looks like this
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.<TestData>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) - > event.getTimestamp()))
.keyBy(TestData::getKey)
.window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
.process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
@Override
public void process(
....
});
Things tried
Update: The answer is that the KafkaSource behaves differently than FlinkKafkaConsumer in the case where the number of Kafka partitions is smaller than the parallelism of Flink's kafka source operator. See https://stackoverflow.com/a/70101290/2000823 for details.
Original answer:
The problem is almost certainly something related to the timestamps and watermarks.
To verify that timestamps and watermarks are the problem, you could do a quick experiment where you replace the 3-hour-long event time sliding windows with short processing time tumbling windows.
In general it is preferred (but not required) to have the KafkaSource do the watermarking. Using forMonotonousTimestamps
in a watermark generator applied after the source, as you are doing now, is a risky move. This will only work correctly if the timestamps in all of the partitions being consumed by each parallel instance of the source are processed in order. If more than one Kafka partition is assigned to any of the KafkaSource tasks, this isn't going to happen. On the other hand, if you supply the forMonotonousTimestamps
watermarking strategy in the fromSource call (rather than noWatermarks
), then all that will be required is that the timestamps be in order on a per-partition basis, which I imagine is the case.
As troubling as that is, it's probably not enough to explain why the windows don't produce any results. Another possible root cause is that the test data set doesn't include any events with timestamps after the first window, so that window never closes.
Do you have a sink? If not, that would explain things.
You can use the Flink dashboard to help debug this. Look to see if the watermarks are advancing in the window tasks. Turn on checkpointing, and then look to see how much state the window task has -- it should have some non-zero amount of state.