Search code examples
scalaakka-streamalpakka

Why don't Akka Streams application terminate normally?


I wrote this simple application using Alpakka Cassandra Library

package com.abhi

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._

object MyApp extends App {
   implicit val actorSystem = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   implicit val session = Cluster
      .builder
      .addContactPoints(List("localhost") :_*)
      .withPort(9042)
      .withCredentials("foo", "bar")
      .build
      .connect("foobar")
   val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
   val source = CassandraSource(stmt)
   val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
   val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
      s =>
      import GraphDSL.Implicits._
      source.take(10) ~> toFoo ~> s
      ClosedShape
   })
   // let us run the graph
   val future = graph.run()
   import actorSystem.dispatcher
   future.onComplete{_ =>
      session.close()
      Await.result(actorSystem.terminate(), Duration.Inf)
   }
   Await.result(future, Duration.Inf)
   System.exit(0)
}

case class Foo(col1: Long, col2: Long)

This application runs exactly as expected it prints 10 rows on the screen.

But post that it hangs. When the System.exit(0) call is executed it throws an exception

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"

But still the application does not stop running. it just hangs.

I don't understand why doesn't this application terminate normally (in fact it shouldn't even need system.exit(0) call.

The only way to exit this application is via a control C.


Solution

  • This might happen because sbt runs your code in its own JVM instance, your System.exit will then exit sbt's JVM giving the above result.

    Did you try setting: fork in run := true somewhere in your sbt build?

    I'm also not sure it is a good idea to use actorSystem.dispatcher to execute your onComplete callback (because you use it to wait for the termination of the actor system itself).

    Something you could try instead:

    import actorSystem.dispatcher
    future.onComplete{ _ =>
      session.close()
      actorSystem.terminate()
    }
    Await.result(actorSystem.whenTerminated, Duration.Inf)
    

    Note that the JVM will exit without you needing to call System.exit when the only threads left are daemon threads (see for example What is Daemon thread in Java?).