Search code examples
javaclassloaderapache-storm

Run a local cluster under a nondefault classloader


A Local Cluster from a web classloader

I'm trying to run a local cluster from a web container (yes, it's only for dev & testing purposes) and am having difficulty with classloaders.

Direct approach

When I do it the easy and recommended way,

ILocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, stormConf, topology);

I get rewarded with

Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt
    at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke(executor.clj:787)
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:745)

This is because the classloader used to load and instantiate the StormTopology is an instance of a Jetty WebAppClassLoader, but the (sub)process spawned by LocalCluster.submitTopology() apparently uses the system classloader. I confirmed this by logging the classloader in the static block of the SomeFilteringBolt - the class is indeed loaded twice and the bolt from WebAppCL obviously cannot be cast to a bolt on the system classloader later on.

Expected behaviour

Now, this is surprising to me as I thought Storm would serialize the StormTopology instance, "send" it locally, deserialize it and run it. If it did that, though, it definitely would work. Rather it seems that it's directly using the provided StormTopology instance which is problematic under a different classloader.

What I have tried since

I tried setting these to true to force Storm to serialize my topology locally. No change.

I tried running the LocalCluster under the system classloader:

ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
try {
    Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());

    Config topologyConf = createTopologyConfig();
    Map<String, Object> stormConf = createStormConfig(topologyConf);
    StormTopology topology = createTopology(topologyConf);

    ILocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology(topologyName, stormConf, topology);
} finally {
    Thread.currentThread().setContextClassLoader(originalClassloader);
}

This actually got me a bit further:

Thread  died: java.lang.ExceptionInInitializerError
    at clojure.core__init.__init0(Unknown Source)
    at clojure.core__init.<clinit>(Unknown Source)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at clojure.lang.RT.classForName(RT.java:2154)
    at clojure.lang.RT.classForName(RT.java:2163)
    at clojure.lang.RT.loadClassForName(RT.java:2182)
    at clojure.lang.RT.load(RT.java:436)
    at clojure.lang.RT.load(RT.java:412)
    at clojure.lang.RT.doInit(RT.java:454)
    at clojure.lang.RT.<clinit>(RT.java:330)
    at clojure.lang.Namespace.<init>(Namespace.java:34)
    at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
    at clojure.lang.Var.internPrivate(Var.java:151)
    at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
    at my.company.storm.LocalTopologyRunner.startTopology(LocalTopologyRunner.java:146)
    ... 10 more
Caused by: java.lang.IllegalStateException: Attempting to call unbound fn: #'clojure.core/refer
    at clojure.lang.Var$Unbound.throwArity(Var.java:43)
    at clojure.lang.AFn.invoke(AFn.java:32)
    at clojure.lang.Var.invoke(Var.java:379)
    at clojure.lang.RT.doInit(RT.java:467)
    at clojure.lang.RT.<clinit>(RT.java:330)
    ... 18 more

Wat?!

The question

How can I run a Storm topology in local mode safely from a classloader other than the system classloader?

I'm running on Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.


Solution

  • Apache Storm 1.0.3 magically fixed this.

    Even without TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, even though there's no trace of the fix in the release notes, so I was not able to track it down to a code change. Anyway, we're happy it works as expected now.