Why don't Akka Streams application terminate normally?

I wrote this simple application using Alpakka Cassandra Library

package com.abhi

import{ActorMaterializer, ClosedShape}
import{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._

object MyApp extends App {
   implicit val actorSystem = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   implicit val session = Cluster
      .addContactPoints(List("localhost") :_*)
      .withCredentials("foo", "bar")
   val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
   val source = CassandraSource(stmt)
   val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
   val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
      s =>
      import GraphDSL.Implicits._
      source.take(10) ~> toFoo ~> s
   // let us run the graph
   val future =
   import actorSystem.dispatcher
   future.onComplete{_ =>
      Await.result(actorSystem.terminate(), Duration.Inf)
   Await.result(future, Duration.Inf)

case class Foo(col1: Long, col2: Long)

This application runs exactly as expected it prints 10 rows on the screen.

But post that it hangs. When the System.exit(0) call is executed it throws an exception

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"

But still the application does not stop running. it just hangs.

I don't understand why doesn't this application terminate normally (in fact it shouldn't even need system.exit(0) call.

The only way to exit this application is via a control C.


  • This might happen because sbt runs your code in its own JVM instance, your System.exit will then exit sbt's JVM giving the above result.

    Did you try setting: fork in run := true somewhere in your sbt build?

    I'm also not sure it is a good idea to use actorSystem.dispatcher to execute your onComplete callback (because you use it to wait for the termination of the actor system itself).

    Something you could try instead:

    import actorSystem.dispatcher
    future.onComplete{ _ =>
    Await.result(actorSystem.whenTerminated, Duration.Inf)

    Note that the JVM will exit without you needing to call System.exit when the only threads left are daemon threads (see for example What is Daemon thread in Java?).