Search code examples
apache-storm

Storm 1.2.2 and Kafka Version 2.x


I'm testing a case using Storm 1.2.2 and Kafka 2.x as my Spout. So i created a LocalCluster just for test purposes.

  TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
        builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
        Utils.sleep(10000);

After initialize this app i got the following:

9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4

And after a lot of error:

9664 [Thread-20-kafka_spout-executor[3 3]] INFO  o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN  o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0

I think this problem is because of Kafka Version, as you can see the log is showing version "0.10.1.0" but my Kafka version is "2.x".

This is my pom.xml:

 <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${version.storm}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>${version.storm}</version>
        </dependency>

Where ${version.storm} is 1.2.2


Solution

  • You are supposed to also declare the version of kafka-clients you are using. The storm-kafka-client POM sets the kafka-clients scope to provided. This means kafka-clients won't be included when you build. We do this so you can easily upgrade.

    The reason it's even running for you is because you are using LocalCluster in some test code, where provided dependencies are present.

    Add this to your POM, and it should work:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>your-kafka-version-here</version>
            </dependency>