Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafka

Kafka is not assigning a partition after Consumer.Poll(Duration.ZERO);



i started a project where i implement appache kafka.
I already have a working producer that writes data into the queue. So far so good. Now i wanted to program an consumer that reads out all the data in the queue.
That is the corresponding code:
try {
    consumer.subscribe(Collections.singletonList("names"));
    if (startingPoint != null){
        consumer.
        consumer.poll(Duration.ofMillis(0));
        consumer.seekToBeginning(consumer.assignment());
    }
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            keyValuePairs.add(new String[]{record.key(),record.value()});
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

That code doesnt work right now like it is supposed to do. Only new records are consumed. I was able to find out that seekToBeginning() isn´t working because no partition is assigned to the consumer in that moment. If i increase the duration of the poll it works. If i just pause the thread on the other hand it doesn´t.

Could someone please try to explain me why that is the case. I tried to find out by myself and already read something about a Kafka heartbeat. But i still haven´t fully understood what happens exactly.


Solution

  • The assignment takes time; polling for 0 will generally mean the poll will exit before it occurs.

    You should add a ConsumerRebalanceListener callback to the subscribe() method and perform the seek in onPartitionsAssigned().

    EDIT

    @SpringBootApplication
    public class So69121558Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69121558Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so69121558", "test");
                Consumer<String, String> consumer = cf.createConsumer("group", "");
                consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
    
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    }
    
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        consumer.seekToBeginning(partitions);
                    }
    
                });
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                records.forEach(System.out::println);
                Thread.sleep(5000);
                consumer.close();
            };
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
        }
    
    }
    

    Here are a couple of examples of doing it the Spring way - just add one of these (or both) to the above class.

    @KafkaListener(id = "so69121558", topics = "so69121558")
    void listen(ConsumerRecord<?, ?> rec) {
        System.out.println(rec);
    }
    
    @KafkaListener(id = "so69121558-1", topics = "so69121558")
    void pojoListen(String in) {
        System.out.println(in);
    }
    

    The seeks are done a bit differently too; here's the complete example:

    @SpringBootApplication
    public class So69121558Application extends AbstractConsumerSeekAware {
    
        public static void main(String[] args) {
            SpringApplication.run(So69121558Application.class, args);
        }
    
        @KafkaListener(id = "so69121558", topics = "so69121558")
        void listen(ConsumerRecord<?, ?> rec) {
            System.out.println(rec);
        }
    
        @KafkaListener(id = "so69121558-1", topics = "so69121558")
        void pojoListen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
        }
    
        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            callback.seekToBeginning(assignments.keySet());
        }
    
    
    }