Search code examples
apache-kafkaapache-storm

Apache Storm JoinBolt


I am trying to join two kafka data streams(using kafka spouts) into one using JoinBolt with following code snippet (http://storm.apache.org/releases/1.1.2/Joins.html)

It says that each of JoinBolt's incoming data streams must be Fields Grouped on a single field. A stream should only be joined with the other streams using the field on which it has been FieldsGrouped

Code Snippet :

    KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId

    KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId

    topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);

    topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);

    JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
                     .join("kafka-spout-2", "deptId", "kafka-spout-1")             
                     .select("id,deptId,firstName,deptName")
                  .withTumblingWindow(new Duration(10, TimeUnit.SECONDS));

topologyBuilder.setBolt("joiner", joinBolt, 1)
            .fieldsGrouping("spout-1", new Fields("id"))
            .fieldsGrouping("spout-2", new Fields("deptId"));

kafka-spout-1 sample record --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}

kafka-spout-2 sample record --> {"deptId" : 1 ,"deptName" : "Engineering"}

I got following exception while deploying topology using above code snippet

[main] WARN  o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
    at BuildTopology.runTopology(BuildTopology.java:71)
    at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
    at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
    at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
    at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
    at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
    at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
    ... 4 more

How to solve the issue?

Thank you,any help will be appreciated


Solution

  • Consider using storm-kafka-client instead of storm-kafka if you're doing new development. Storm-kafka is deprecated.

    Does the spout actually emit a field called "deptId"?

    Your configuration snippet doesn't mention that you set the SpoutConfig.scheme, and your example records seem to imply that you're emitting JSON documents containing a "deptId" field.

    Storm doesn't know anything about JSON or the contents of the strings coming out of the spout. You need to define a scheme that makes the spout emit the "deptId" field separately from the rest of the record.

    Here's the relevant snippet from one of the built-in schemes that emits the message, topic and offset in separate fields:

    @Override
        public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
            String stringMessage = StringScheme.deserializeString(message);
            return new Values(stringMessage, partition.partition, offset);
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
    }
    

    See https://github.com/apache/storm/blob/v1.2.2/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java for reference.

    An alternative to you doing this with a scheme is that you make a bolt in between the spout and the JoinBolt that extracts the "deptId" from the record and emits it as a field alongside the record.