Search code examples
apache-kafkaapache-flinkflink-streaming

FlinkKafkaConsumer010 doesn't work when set with setStartFromTimestamp


I'm using flink streaming and flink-connector-kafka to process data from kafka. when I configure FlinkKafkaConsumer010 with setStartFromTimestamp(1586852770000L) , at this time, all data's time in kafka topic A is before 1586852770000L, then I send some message to partition-0 and partition-4 of Topic A (Topic A has 6 partitions, current system time is already after 1586852770000L). but my flink program doesn't consume any data from Topic A. So is this a issue?

if I stop my flink program and restart it, it can consume data from partition-0 and partition-4 of Topic A , but still won't consume any data from other 4 partitions if i send data to the other 4 partitions unless i restart my flink program again.

the log of kafka is as follows:

2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1

2020-04-15 11:48:46,481 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]

from the log, except partition-0 and partition-4, other 4 partition's offset is -1. why the return offset is -1 instead of the lastest offset?

in Kafka client's code( Fetcher.java,line: 674-680)

// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
   OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
   timestampOffsetMap.put(topicPartition, offsetData);
}

the value of ListOffsetResponse.UNKNOWN_OFFSET is -1 . So the other 4 partitions is filtered , and the kafka consumer will not consume data from the other 4 partitions.

My Flink version is 1.9.2 and flink kafka connertor is

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.9.2</version>
</dependency>

the doc of flink kafka connector is as follows:

setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record.

test program code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test

class TestFlinkKafka {

  @Test
  def testFlinkKafkaDemo: Unit ={
    //1. set up the streaming execution environment.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
    // To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
    env.enableCheckpointing(60000)
    //2. kafka source
    val topic = "message"
    val schema = new SimpleStringSchema()
    //server1:9092,server2:9092,server3:9092
    val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
    val  consumer = new FlinkKafkaConsumer010(topic, schema, props)
    //consume data from special timestamp's offset
    //2020/4/14 20:0:0
    //consumer.setStartFromTimestamp(1586865600000L)
    //2020/4/15 20:0:0
    consumer.setStartFromTimestamp(1586952000000L)
    consumer.setCommitOffsetsOnCheckpoints(true)

    //3. transform
    val stream = env.addSource(consumer)
      .map(x => x)

    //4. sink
    stream.print()

    //5. execute
    env.execute("testFlinkKafkaConsumer")

  }

  def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokerList)
    props.setProperty("group.id", groupId)
    props.setProperty("auto.offset.reset", offsetReset)
    props.setProperty("flink.partition-discovery.interval-millis", "30000")
    props
  }

}

set log level for kafka:

log4j.logger.org.apache.kafka=TRACE

create kafka topic:

kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1

send message to kafka topic

kafka-console-producer --broker-list localhost:9092 --topic message

{"name":"tom"}
{"name":"michael"}

Solution

  • This problem was resolved by upgrading the Flink/Kafka connector to the newer, universal connector -- FlinkKafkaConsumer -- available from flink-connector-kafka_2.11. This version of the connector is recommended for all versions of Kafka from 1.0.0 forward. With Kafka 0.10.x or 0.11.x, it is better to use the version-specific flink-connector-kafka-0.10_2.11 or flink-connector-kafka-0.11_2.11 connectors. (And in all cases, substitute 2.12 for 2.11 if you are using Scala 2.12.)

    See the Flink documentation for more information on Flink's Kafka connector.