I'm testing a case using Storm 1.2.2 and Kafka 2.x as my Spout. So i created a LocalCluster just for test purposes.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
Utils.sleep(10000);
After initialize this app i got the following:
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4
And after a lot of error:
9664 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
I think this problem is because of Kafka Version, as you can see the log is showing version "0.10.1.0" but my Kafka version is "2.x".
This is my pom.xml:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${version.storm}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${version.storm}</version>
</dependency>
Where ${version.storm}
is 1.2.2
You are supposed to also declare the version of kafka-clients
you are using. The storm-kafka-client
POM sets the kafka-clients
scope to provided
. This means kafka-clients
won't be included when you build. We do this so you can easily upgrade.
The reason it's even running for you is because you are using LocalCluster in some test code, where provided
dependencies are present.
Add this to your POM, and it should work:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version-here</version>
</dependency>