Search code examples
javaapache-kafkakafka-producer-api

How to get Kafka Producer messages count


I use following code to create one producer which produces around 2000 messages.

public class ProducerDemoWithCallback {

    public static void main(String[] args) {

        final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

String bootstrapServers = "localhost:9092";
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


    for (int i=0; i<2000; i++ ) {
        // create a producer record
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));

        // send data - asynchronous
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // executes every time a record is successfully sent or an exception is thrown
                if (e == null) {
                    // the record was successfully sent
                    logger .info("Received new metadata. \n" +
                            "Topic:" + recordMetadata.topic() + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp());


                } else {
                    logger .error("Error while producing", e);
                }
            }
        });
    }

    // flush data
    producer.flush();
    // flush and close producer
    producer.close();
  }
}

I want to count those messages and get int value. I use this command and it works, but i am trying to get this count using code.

"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"

and the result is

- TwitterProducer:0:2000

My code to do the same programmatically looks something like this, but I'm not sure if this is the correct way to get the count:

 int valueCount = (int) recordMetadata.offset();
 System.out.println("Offset value " + valueCount); 

Can someone help me to get count of Kafka messages offset value using code.


Solution

  • You can have a look at implementation details of GetOffsetShell.

    Here is a simplified code re-written in Java:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.*;
    import java.util.stream.Collectors;
    
    public class GetOffsetCommand {
    
        private static final Set<String> TopicNames = new HashSet<>();
    
        static {
            TopicNames.add("my-topic");
            TopicNames.add("not-my-topic");
        }
    
        public static void main(String[] args) {
            TopicNames.forEach(topicName -> {
                final Map<TopicPartition, Long> offsets = getOffsets(topicName);
    
                new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
                System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
            });
        }
    
        private static Map<TopicPartition, Long> getOffsets(String topicName) {
            final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
            final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
            return consumer.endOffsets(partitions);
        }
    
        private static KafkaConsumer<String, String> makeKafkaConsumer() {
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");
    
            return new KafkaConsumer<>(props);
        }
    
        private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
            return consumer.listTopics().entrySet().stream()
                    .filter(t -> topicName.equals(t.getKey()))
                    .flatMap(t -> t.getValue().stream())
                    .map(p -> new TopicPartition(p.topic(), p.partition()))
                    .collect(Collectors.toList());
        }
    }
    

    which produces the offset for each topic's partition and sum (total number of messages), like:

    my-topic-0=184
    my-topic-2=187
    my-topic-4=189
    my-topic-1=196
    my-topic-3=243
    my-topic:999