Search code examples
apache-kafkaapache-flinkflink-sql

Flink SQL does not honor "table.exec.source.idle-timeout" setting


I have a Flink job running FlinkSQL with the following setup:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");

To test this locally with a Kafka source, I fired a few events to the Flink job. The Flink UI shows it produced one watermark. I waited 3 minutes to see if watermarks advance without sending in new events (i.e idle partition). However, no watermark advancement occurred.

Note: I use a Kafka broker locally with three partitions. and my test data is keyed and hence gets sent to the same partition. However, I am not seeing watermarks advance even if other partitions are idle and if I wait 3 minutes.

  1. Any place in the JOB UI I could see if the value i set for 3 minutes is actually picked up? Am I using the right units(seconds vs ms)

  2. Anything else I could check to test this setting?

We are running Flink 1.12.1.

Update: I see this exception in my Flink SQL job under exceptions: Wonder if there is a mismatch of versions.

2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
    at java.base/java.util.Optional.ifPresent(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
    at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
    at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)

Solution

  • The issue was that this setting does not work in Flink 1.12.0 or 1.12.1. I had to upgrade to Flink 1.13.2 and the setting was honored and worked as expected.

    The exception was a red herring and not consistently reproducible.