I want to receive the latest data from Kafka to Flink Program, but Flink is reading the historical data.
I have set auto.offset.reset
to latest
as shown below, but it did not work
properties.setProperty("auto.offset.reset", "latest");
Flink Programm is receiving the data from Kafka using below code
//getting stream from Kafka and giving it assignTimestampsAndWatermarks
DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
I was following the discussion on https://issues.apache.org/jira/browse/FLINK-4280 , which suggests adding the source in below mentioned way
Properties props = new Properties();
...
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...
env.addSource(kafka)
I did the same, however, I was not able to access the setStartFromLatest()
FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);
What should I do to receive the latest values that are being sent to Kafka rather than receiving values from history?
The problem was solved by creating the new group id
named test1
both for the sender and consumer and keeping the topic name same as test
.
Now I am wondering, is this the best way to solve this issue? because every time I need to give a new group id
Is there some way I can just read data that is being sent to Kafka?