I have setup a project trying to run standard "ExclamationTopology" on the in-memory version of the storm, triggered from the IntelliJ IDE. There goes my POM.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>sample</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.98.4-hadoop2</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<repositories>
<repository>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>central</id>
<url>http://repo1.maven.org/maven2/</url>
</repository>
<repository>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
<id>clojars</id>
<url>https://clojars.org/repo/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<mainClass>test.ExclamationTopology</mainClass>
<arguments>-local</arguments>
</configuration>
</plugin>
</plugins>
</build>
</project>
Along with the sample source code of my topology:
public class ExclamationTopology extends ConfigurableTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
ConfigurableTopology.start(new ExclamationTopology(), args);
}
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
conf.setDebug(true);
String topologyName = "test";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
}
In order to be able to run the topology locally from within my IDE via Maven I included exec maven plugin. Then I use the following mvn command to run the application:
exec:java -Dexec.args=-local
However, I do get the following exception:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:110) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.<init>(ThriftClient.java:70) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.<init>(NimbusClient.java:158) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:113) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:211) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:391) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:163) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.topology.ConfigurableTopology.submit(ConfigurableTopology.java:94) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at test.ExclamationTopology.run(ExclamationTopology.java:69) [classes/:?]
at org.apache.storm.topology.ConfigurableTopology.start(ConfigurableTopology.java:70) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at test.ExclamationTopology.main(ExclamationTopology.java:47) [classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) [exec-maven-plugin-1.2.1.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBackoffConnect.java:64) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:56) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 20 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
at org.apache.thrift.transport.TSocket.open(TSocket.java:226) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 20 more
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method) ~[?:1.8.0_112]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79) ~[?:1.8.0_112]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_112]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_112]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_112]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[?:1.8.0_112]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_112]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_112]
at org.apache.thrift.transport.TSocket.open(TSocket.java:221) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 20 more
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:141)
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83)
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268)
at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599)
at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565)
The README for storm-starter is out of date. The examples don't run locally anymore, because ConfigurableTopology was changed to not support this here https://github.com/apache/storm/commit/b254ede46a25466749cd48ebd4bcb56dd791ec4a#diff-de7eab133732a8b5b97be6aa7328e392R92.
If you want to run it locally, you can use https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/LocalCluster.java, which should replace the call to submit in your topology code. Otherwise you'll have to set up a local Storm instance to run the topology (which is very easy, see https://storm.apache.org/releases/2.0.0-SNAPSHOT/Setting-up-a-Storm-cluster.html. The storm-starter README tells you how to submit to an installed cluster).
Edit: If you want to run it locally, another option would probably be to use the "storm local" command.
PS E:\apache-storm-2.0.0-SNAPSHOT\bin> ./storm help local
Syntax: [storm local topology-jar-path class ...]
Runs the main method of class with the specified arguments but pointing to a local cluster
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
and others will interact with a local cluster instead of the one configured by default.
Most options should work just like with the storm jar command.
local also adds in the option --local-ttl which sets the number of seconds the
local cluster will run for before it shuts down.
--java-debug lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK
--java-debug transport=dt_socket,address=localhost:8000
will open up a debugging server on port 8000.
The documentation for local mode has been updated in the Storm repo, but hasn't yet made it to the website. See https://github.com/apache/storm/blob/master/docs/Local-mode.md for the new docs.