try {
consumer.subscribe(Collections.singletonList("names"));
if (startingPoint != null){
consumer.
consumer.poll(Duration.ofMillis(0));
consumer.seekToBeginning(consumer.assignment());
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
keyValuePairs.add(new String[]{record.key(),record.value()});
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
That code doesnt work right now like it is supposed to do. Only new records are consumed. I was able to find out that seekToBeginning() isn´t working because no partition is assigned to the consumer in that moment. If i increase the duration of the poll it works. If i just pause the thread on the other hand it doesn´t.
Could someone please try to explain me why that is the case. I tried to find out by myself and already read something about a Kafka heartbeat. But i still haven´t fully understood what happens exactly.
The assignment takes time; polling for 0 will generally mean the poll will exit before it occurs.
You should add a ConsumerRebalanceListener
callback to the subscribe() method and perform the seek in onPartitionsAssigned()
.
EDIT
@SpringBootApplication
public class So69121558Application {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@Bean
public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
return args -> {
template.send("so69121558", "test");
Consumer<String, String> consumer = cf.createConsumer("group", "");
consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
});
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(System.out::println);
Thread.sleep(5000);
consumer.close();
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
}
Here are a couple of examples of doing it the Spring way - just add one of these (or both) to the above class.
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
The seeks are done a bit differently too; here's the complete example:
@SpringBootApplication
public class So69121558Application extends AbstractConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So69121558Application.class, args);
}
@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
System.out.println(rec);
}
@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}