I would like to get help for initial setup with apache storm and kafka setup.
I was able to submit topology to the storm cluster but getting below error in storm ui.
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)
My code snippet is below.
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot = "/kafka-cluster-1/brokers/topics";
String topicName = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt */
Integer boltParalismHint = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
I have referenced the document and set ignoreZkOffsets to true.
If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true
However , from the log , kafka spout is seems to be reading the offset from Zookeeper.
Since its an intial setup , how can I stop storm reading offsets from Zookeeper ?
I use below versions.
I did not do anything special but in below condition , the error seems to not appear in storm ui .