I use following code to create one producer which produces around 2000 messages.
public class ProducerDemoWithCallback {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0; i<2000; i++ ) {
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));
// send data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
logger .info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger .error("Error while producing", e);
// flush data
// flush and close producer
I want to count those messages and get int value. I use this command and it works, but i am trying to get this count using code.
"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"
and the result is
- TwitterProducer:0:2000
My code to do the same programmatically looks something like this, but I'm not sure if this is the correct way to get the count:
int valueCount = (int) recordMetadata.offset();
System.out.println("Offset value " + valueCount);
Can someone help me to get count of Kafka messages offset value using code.
You can have a look at implementation details of GetOffsetShell.
Here is a simplified code re-written in Java:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.stream.Collectors;
public class GetOffsetCommand {
private static final Set<String> TopicNames = new HashSet<>();
static {
public static void main(String[] args) {
TopicNames.forEach(topicName -> {
final Map<TopicPartition, Long> offsets = getOffsets(topicName);
new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
private static Map<TopicPartition, Long> getOffsets(String topicName) {
final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
return consumer.endOffsets(partitions);
private static KafkaConsumer<String, String> makeKafkaConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");
return new KafkaConsumer<>(props);
private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
return consumer.listTopics().entrySet().stream()
.filter(t -> topicName.equals(t.getKey()))
.flatMap(t -> t.getValue().stream())
.map(p -> new TopicPartition(p.topic(), p.partition()))
which produces the offset for each topic's partition and sum (total number of messages), like: