Search code examples
mavenintellij-ideaapache-storm

Apache Storm intellij local mode - NimbusLeaderNotFoundException


I have setup a project trying to run standard "ExclamationTopology" on the in-memory version of the storm, triggered from the IntelliJ IDE. There goes my POM.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>storm</groupId>
    <artifactId>sample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hbase.version>0.98.4-hadoop2</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-client</artifactId>
            <version>2.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-server</artifactId>
            <version>2.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>


    <repositories>
        <repository>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
        </repository>
        <repository>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <id>clojars</id>
            <url>https://clojars.org/repo/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <configuration>
                    <mainClass>test.ExclamationTopology</mainClass>
                    <arguments>-local</arguments>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Along with the sample source code of my topology:

public class ExclamationTopology extends ConfigurableTopology {

    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;

        @Override
        public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

    }

    public static void main(String[] args) throws Exception {
        ConfigurableTopology.start(new ExclamationTopology(), args);
    }

    protected int run(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("word", new TestWordSpout(), 10);
        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

        conf.setDebug(true);

        String topologyName = "test";

        conf.setNumWorkers(3);


        if (args != null && args.length > 0) {
            topologyName = args[0];
        }

        return submit(topologyName, conf, builder);
    }
}

In order to be able to run the topology locally from within my IDE via Maven I included exec maven plugin. Then I use the following mvn command to run the application:

exec:java -Dexec.args=-local

However, I do get the following exception:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:110) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.ThriftClient.<init>(ThriftClient.java:70) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.utils.NimbusClient.<init>(NimbusClient.java:158) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:113) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:211) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:391) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:163) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.topology.ConfigurableTopology.submit(ConfigurableTopology.java:94) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at test.ExclamationTopology.run(ExclamationTopology.java:69) [classes/:?]
    at org.apache.storm.topology.ConfigurableTopology.start(ConfigurableTopology.java:70) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at test.ExclamationTopology.main(ExclamationTopology.java:47) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) [exec-maven-plugin-1.2.1.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
    at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBackoffConnect.java:64) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:56) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 20 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
    at org.apache.thrift.transport.TSocket.open(TSocket.java:226) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 20 more
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.connect0(Native Method) ~[?:1.8.0_112]
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79) ~[?:1.8.0_112]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_112]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_112]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_112]
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[?:1.8.0_112]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_112]
    at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_112]
    at org.apache.thrift.transport.TSocket.open(TSocket.java:221) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    ... 20 more
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
    at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:141)
    at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83)
    at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268)
    at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599)
    at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565)

Solution

  • The README for storm-starter is out of date. The examples don't run locally anymore, because ConfigurableTopology was changed to not support this here https://github.com/apache/storm/commit/b254ede46a25466749cd48ebd4bcb56dd791ec4a#diff-de7eab133732a8b5b97be6aa7328e392R92.

    If you want to run it locally, you can use https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/LocalCluster.java, which should replace the call to submit in your topology code. Otherwise you'll have to set up a local Storm instance to run the topology (which is very easy, see https://storm.apache.org/releases/2.0.0-SNAPSHOT/Setting-up-a-Storm-cluster.html. The storm-starter README tells you how to submit to an installed cluster).

    Edit: If you want to run it locally, another option would probably be to use the "storm local" command.

    PS E:\apache-storm-2.0.0-SNAPSHOT\bin> ./storm help local
    Syntax: [storm local topology-jar-path class ...]
    
    Runs the main method of class with the specified arguments but pointing to a local cluster
    The storm jars and configs in ~/.storm are put on the classpath.
    The process is configured so that StormSubmitter
    (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html)
    and others will interact with a local cluster instead of the one configured by default.
    
    Most options should work just like with the storm jar command.
    
    local also adds in the option --local-ttl which sets the number of seconds the
    local cluster will run for before it shuts down.
    
    --java-debug lets you turn on java debugging and set the parameters passed to -agentlib:jdwp on the JDK
    --java-debug transport=dt_socket,address=localhost:8000
    will open up a debugging server on port 8000.
    

    The documentation for local mode has been updated in the Storm repo, but hasn't yet made it to the website. See https://github.com/apache/storm/blob/master/docs/Local-mode.md for the new docs.