I am quiet new to Apache Storm and have been trying with trident topology for Kafka i.e. TransactionalTridentKafkaSpout. All works fine except the Storm UI. Even though I've not produced any data to my topic, the Storm UI keeps showing invalid emitted/transferred values. Meaning the count keeps on increasing even when there is no data in the topic. I've tried deleting the data/logs stored in zookeeper, storm, kafka and recreate kafka topics and also have set
topology.stats.sample.rate: 1.0
but still the problem persists.
And also I came across a tool called Capillary to monitor storm cluster. I am using the below properties
capillary.zookeepers="192.168.125.20:2181"
capillary.kafka.zkroot="192.168.125.20:/home/storm/kafka_2.11-0.8.2.0"
capillary.storm.zkroot="192.168.125.20:/home/storm/apache-storm-0.9.3"
I am using Kafka's embedded zookeeper here. Even this is not working getting the below exception.
! @6mbg4bp7l - Internal server error, for (GET) [/] ->
play.api.Application$$anon$1: Execution exception[[JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values
at [Source: java.io.StringReader@24adb083; line: 1, column: 9]]]
at play.api.Application$class.handleError(Application.scala:296) ~[com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
at play.api.DefaultApplication.handleError(Application.scala:402) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:205) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$14$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:202) [com.typesafe.play.play_2.10-2.3.4.jar:2.3.4]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) [org.scala-lang.scala-library-2.10.4.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('.' (code 46)): Expected space separating root-level values
at [Source: java.io.StringReader@24adb083; line: 1, column: 9]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:495) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1178) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
Any help on either would be great. Thanks in advance.
Configuration and source code snippet:
final Config config = new Config();
config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 3000);
config.setNumWorkers(2);
config.put(Config.NIMBUS_HOST, "192.168.125.20");
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.125.20"));
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10);
config.put(Config.DRPC_SERVERS, Arrays.asList("192.168.125.20"));
config.put(Config.DRPC_PORT, 3772);
final BrokerHosts zkHosts = new ZkHosts("192.168.125.20");
final TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "Test_Topic", "");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
final TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
final TridentTopology topology = new TridentTopology();
topology.newStream("spout", kafkaSpout)
.each(new Fields("str"), new TestFunction(), new Fields("test"))
.each(new Fields("str"), new PrintFilter());
Topology Summary Image :
Are you possibly seeing the what I'd call the UI metric artifacts of Trident? These tuples also show up in counters of the Storm UI:
Trident executes a batch every 500ms (by default). A batch involves a bunch of coordination messages going out to all the bolts to coordinate the batch (even if the batch is empty). So that's what you're seeing.
(source: Trident Kafka Spout - Ack Count Increasing Even Though No Messages Are Processed)