Search code examples
javaparallel-processingapache-kafkaapache-stormtrident

Apache Storm Trident and Kafka Spout Integration


I am unable to find good documentation for correctly integrating Kafka with Apache Storm Trident. I tried to look into the related previously posted questions here, but no sufficient information.

I would like to connect Trident with Kafka as OpaqueTridentKafkaSpout. Here is the sample Code which is currently working

GlobalPartitionInformation globalPartitionInformation  = new GlobalPartitionInformation(properties.getProperty("topic", "mytopic"));
Broker brokerForPartition0 = new Broker("IP1",9092);
Broker brokerForPartition1 = new Broker("IP2", 9092);
Broker brokerForPartition2 = new Broker("IP3:9092");

globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts staticHosts = new StaticHosts(globalPartitionInformation);
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);

With this I am able to generate streams for my topology as shown in the code below

TridentTopology topology = new TridentTopology();
Stream analyticsStream  = topology.newStream("spout", kafkaSpout).parallelismHint(Integer.valueOf(properties.getProperty("spout","6")))

Though I have provided parallelism and my partitions, only 1 executor of Kafka Spout is running and thereby I am unable to scale it well.

Can anyone please guide me better ways of integrating Apache Storm Trident (2.0.0) with Apache Kafka (1.0) with 3 node cluster each ?

Also, as soon as it finishes reading from Kafka, I am getting these logs constantly

2018-04-09 14:17:34.119 o.a.s.k.KafkaUtils Thread-15-spout-spout-executor[79 79] [INFO] Metrics Tick: Not enough data to calculate spout lag.  2018-04-09 14:17:34.129 o.a.s.k.KafkaUtils Thread-21-spout-spout-executor[88 88] [INFO] Metrics Tick: Not enough data to calculate spout lag.

And in Storm UI, I can see acks for the messages above. Any suggestion to ignore metric Ticks ?


Solution

  • If you are on Storm 2.0.0 anyway, I think you should switch to the storm-kafka-client Trident spout. The storm-kafka module is only intended to support older Kafka versions, since the underlying Kafka API (SimpleConsumer) is being removed. The new module supports Kafka from 0.10.0.0 and forward.

    You can find an example Trident topology for the new spout here https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java.