I am trying to setup a simple akka-http 2.4.2 project to test it out, but I am failing to do so.
My built.sbt:
import NativePackagerHelper._
lazy val akkaVersion = "2.4.2"
lazy val root = (project in file(".")).
settings(
name := "akkTest",
version := "0.1",
scalaVersion := "2.11.7")
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion
)
enablePlugins(JavaServerAppPackaging)
my code snippet in Main.scala
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.actor.ActorSystem
object Main extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val serverSource =
Http().bind(interface = "localhost", port = 8080)
val bindingFuture =
serverSource.to(Sink.foreach { connection => // foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
}).run()
}
The error on execution throws:
Uncaught error from thread [default-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[default]
java.lang.NoSuchMethodError: akka.actor.ActorCell.addFunctionRef(Lscala/Function2;)Lakka/actor/FunctionRef;
at akka.stream.stage.GraphStageLogic$StageActor.<init>(GraphStage.scala:143)
at akka.stream.stage.GraphStageLogic.getStageActor(GraphStage.scala:904)
at akka.stream.impl.io.ConnectionSourceStage$$anon$1.preStart(TcpStages.scala:56)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:468)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:363)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)
at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)
at akka.actor.ActorCell.create(ActorCell.scala:580)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This must be something on my environment but I dont know how to track the issue. I am using jdk 1.8u71
[info] Done updating.
[info] Including from cache: ssl-config-akka_2.11-0.1.3.jar
[info] Including from cache: reactive-streams-1.0.0.jar
[info] Including from cache: akka-http-spray-json-experimental_2.11-2.4.2.jar
[info] Including from cache: config-1.3.0.jar
[info] Including from cache: spray-json_2.11-1.3.2.jar
[info] Including from cache: ssl-config-core_2.11-0.1.3.jar
[info] Including from cache: scala-parser-combinators_2.11-1.0.4.jar
[info] Including from cache: scala-java8-compat_2.11-0.7.0.jar
[info] Including from cache: akka-parsing_2.11-2.4.2.jar
[info] Including from cache: akka-http-experimental_2.11-2.4.2.jar
[info] Including from cache: akka-actor_2.11-2.4.2.jar
[info] Including from cache: akka-http-core_2.11-2.4.2.jar
[info] Including from cache: akka-stream_2.11-2.4.2.jar
[info] Including from cache: scala-library-2.11.7.jar
Take in mind I only point to dependencies of the same akka version
This program works fine when using sbt run but fails when using the assembled jar with my own scala launcher
The problem is that Spark uses Akka internally. As long as you run your Spark Job as a self contained application (e.g. sbt run
), this is not a problem, since your own version of Akka will be used. Things change however as soon as you submit your application to a cluster with spark-submit
. The Spark classloader will then pick the internal version of Akka over the Akka implementation bundled in your sparkJob.jar
. The NoSuchMethodError
from above therefore comes from akka-stream_2.11-2.4.2
calling into akka-actor_2.11-2.3.x.jar
which is used in Spark, instead of akka-actor_2.11-2.4.2.jar
which is bundled in your job. The method addFunctionRef
is in fact a very recent addition and is not present in earlier versions of Akka. You can verify this by setting a breakpoint at the place where the exception occurs (or use an exception breakpoint). With the application suspend at the problematic location in GraphStage
, evaluate
materializer.supervsor.getClass().getResource("ActorCell.class")
This will print out the location of the class file ActorCell
was loaded from.
To make sure that you are isolated from the Akka version that Spark uses internally, you can use the --driver-class-path
option of spark-submit
, like
spark-submit --class MyJob \
--driver-class-path akka-actor_2.11-2.4.2.jar \
--master spark://master:7077 \
sparkJob.jar
If you do this, I also recommend setting akka-actor
to "test, provided"
in your build.sbt
, so that you don't also include akka-actor
in sparkJob.jar
.