Search code examples
javaapache-kafkakafka-consumer-api

Run kafka consumer without specifying partition


I am learning Kafka recently, and my consumers can't consume any records unless I specify the --parititon 0 parameter. In other words I can NOT consume records like:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic 

but works like:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0

THE MAIN PROBLEM IS, when I moved to java code, my KafkaConsumer class can't fetch records, and I need to know how to specify the partition number in java KafkaConsumer ?!

my current java code is:


public class ConsumerDemo {

    public static void main(String[] args) {

        Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));

        String bootstrapServer = "127.0.0.10:9092";
        String groupId = "my-kafka-java-app";
        String topic = "first-topic";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        //properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic
        consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic

        // poll for new data
        while(true){
            //consumer.poll(100); old way
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                logger.info("Key: " + record.key() + ", Value: "+ record.value() );
                logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
            }

        }

    }
}

Solution

  • After a lot of inspection, my solution came out to be using consumer.assign and consumer.seek instead of using consumer.subscribe and without specifying the groupId. But I feel there should be a more optimal solution

    the java code will be as:

            // create consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    
            // subscribe consumer to our topic
            //consumer.subscribe(Collections.singleton(topic)); //means subscription to one topic
            // using assign and Seek, are mostly used to replay data or fetch a specific msg
            TopicPartition  partitionToReadFrom = new TopicPartition(topic, 0);
            long offsetToReadFrom = 15L;
            // assign
            consumer.assign(Arrays.asList(partitionToReadFrom));
    
            // seek: for a specific offset to read from
            consumer.seek(partitionToReadFrom, offsetToReadFrom);