I am trying to join two kafka data streams(using kafka spouts) into one using JoinBolt with following code snippet (http://storm.apache.org/releases/1.1.2/Joins.html)
It says that each of JoinBolt's incoming data streams must be Fields Grouped on a single field. A stream should only be joined with the other streams using the field on which it has been FieldsGrouped
Code Snippet :
KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId
KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId
topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);
topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);
JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
.join("kafka-spout-2", "deptId", "kafka-spout-1")
.select("id,deptId,firstName,deptName")
.withTumblingWindow(new Duration(10, TimeUnit.SECONDS));
topologyBuilder.setBolt("joiner", joinBolt, 1)
.fieldsGrouping("spout-1", new Fields("id"))
.fieldsGrouping("spout-2", new Fields("deptId"));
kafka-spout-1 sample record --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}
kafka-spout-2 sample record --> {"deptId" : 1 ,"deptName" : "Engineering"}
I got following exception while deploying topology using above code snippet
[main] WARN o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at BuildTopology.runTopology(BuildTopology.java:71)
at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 4 more
How to solve the issue?
Thank you,any help will be appreciated
Consider using storm-kafka-client instead of storm-kafka if you're doing new development. Storm-kafka is deprecated.
Does the spout actually emit a field called "deptId"?
Your configuration snippet doesn't mention that you set the SpoutConfig.scheme
, and your example records seem to imply that you're emitting JSON documents containing a "deptId" field.
Storm doesn't know anything about JSON or the contents of the strings coming out of the spout. You need to define a scheme that makes the spout emit the "deptId" field separately from the rest of the record.
Here's the relevant snippet from one of the built-in schemes that emits the message, topic and offset in separate fields:
@Override
public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
String stringMessage = StringScheme.deserializeString(message);
return new Values(stringMessage, partition.partition, offset);
}
@Override
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
}
See https://github.com/apache/storm/blob/v1.2.2/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java for reference.
An alternative to you doing this with a scheme is that you make a bolt in between the spout and the JoinBolt that extracts the "deptId" from the record and emits it as a field alongside the record.