Search code examples
apache-kafkaapache-zookeeperapache-stormoffsettrident

Checking Offset of Kafka topic for a storm consumer


I am using storm-kafka-client 1.2.1 and creating my spout config for KafkaTridentSpoutOpaque as below

            kafkaSpoutConfig = KafkaSpoutConfig.builder(brokerURL, kafkaTopic)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG,"storm-kafka-group")
                .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
                .setProp(ConsumerConfig.CLIENT_ID_CONFIG,InetAddress.getLocalHost().getHostName())

I am unable to find neither my group-id nor the offset in both Kafka and Zookeeper. Through Zookeeper I tried with zkCli.sh and tried ls /consumers but there were none as I think Kafka itself is now maintaining offsets rather than zookeeper.

I tried with Kafka too with the command below

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand  --list  --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-20130
console-consumer-82696
console-consumer-6106
console-consumer-67393
console-consumer-14333
console-consumer-21174
console-consumer-64550

Can someone help me how I can find my offset and will it replay my events in Kafka again if I restart the topology ?


Solution

  • Trident doesn't store offsets in Kafka, but in Storm's Zookeeper. If you're running with default settings for Storm's Zookeeper config the path in Storm's Zookeeper will be something like /coordinator/<your-topology-id>/meta.

    The objects below that path will contain the first and last offset, as well as topic partition for each batch. So e.g. /coordinator/<your-topology-id>/meta/15 would contain the first and last offset emitted in batch number 15.

    Whether the spout replays offsets after restart is controlled by the FirstPollOffsetStrategy you set in the KafkaSpoutConfig. The default is UNCOMMITTED_EARLIEST, which does not start over on restart. See the Javadoc at https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L126.