Search code examples
apache-kafkaapache-storm

How to deploy storm-core topology with apache kafka integration for first time?


I would like to get help for initial setup with apache storm and kafka setup.

I was able to submit topology to the storm cluster but getting below error in storm ui.

Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

My code snippet is below.

// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot       = "/kafka-cluster-1/brokers/topics";
String topicName    = "myfirsttopic";

/* ****************************************************************** */
/* Topology configuration variable                                    */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt   */
Integer boltParalismHint  = 1;
Integer spoutParalismHint = 1;

/* ****************************************************************** */
/* Build kafka consumer spout                                         */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );

// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );

spoutConfig.ignoreZkOffsets = true;

// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );

// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

I have referenced the document and set ignoreZkOffsets to true.

If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true

However , from the log , kafka spout is seems to be reading the offset from Zookeeper.

Since its an intial setup , how can I stop storm reading offsets from Zookeeper ?

I use below versions.

  • apache storm 1.2.1
  • apache kafka kafka_2.12-1.1.0

Solution

  • I did not do anything special but in below condition , the error seems to not appear in storm ui .

    1. Create topic in Kafka
    2. Make sure that the brokerZkPath exists in Zookeeper (Path to brokers directory. In my case /kafka-cluster-1/brokers)
    3. Make sure that the zkRootPath exists in Zookeeper (Path to topics directory. In my case /kafka-cluster-1/brokers/topics)
    4. Submit topology to storm