Search code examples
scalaopencvakkaakka-stream

Akka Streams: KillSwitch for custom SourceShape emitting frames from video file


Following the documentation to implement a KillSwitch, I was able to code this simple example for stopping a Source emitting infinite numbers.

object KillSwitchSample extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
  val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val killSwitch = KillSwitches.shared("switch")

  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val flow = builder.add(Flow[Int].map(_ * 2))
    mySource.via(killSwitch.flow) ~> flow ~> Sink.foreach(println)
    ClosedShape
  }).run()

  Thread.sleep(200)

  killSwitch.shutdown()
}

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var counter = 1

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, counter)
          counter += 1
        }
      })
    }
}

My use case is different in the sense that the Source emits frames from a video file using OpenCV. Why is the upstream not cancelled? What am I missing here?

object KillSwitchMinimalMain extends App {
  val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))
  System.load(libopencv_java(0))

  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val videoFile = Video("Video.MOV")

  val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile)
  val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph)

  val killSwitch = KillSwitches.shared("switch")

  RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) })

    videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println)

    ClosedShape
  }).run()

  Thread.sleep(200)

  killSwitch.shutdown()
}

class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] {
  val out: Outlet[Frame] = Outlet("VideoSource")
  override val shape: SourceShape[Frame] = SourceShape(out)
  val log: Logger = LoggerFactory.getLogger(getClass)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private val capture = new VideoCapture()
      private val frame = new Mat()
      private var videoPos: Double = _

      override def preStart(): Unit = {
        capture.open(videoFile.filepath)
        readFrame()
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, Frame(videoPos, frame))
          readFrame()
        }
      })

      private def readFrame(): Unit = {
        if (capture.isOpened) {
          videoPos = capture.get(1)
          log.info(s"reading frame $videoPos")
          capture.read(frame)
        }
      }
    }
}

The console output as asked by @svezfaz:

13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0
javafx.scene.image.WritableImage@64b06f30
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0
javafx.scene.image.WritableImage@1e011979
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0
javafx.scene.image.WritableImage@52c9a35c
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0
javafx.scene.image.WritableImage@13968f9e
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0
javafx.scene.image.WritableImage@6ab783be
// and so on ..

Solution

  • The problem is that you introduce blocking in your custom stage. I don't know the OpenCV API, but I'm guessing it happens when you call capture.read(frame). Now, unless otherwise instructed, your graph will be run in one single Actor, therefore blocking in your stage will block the whole actor.

    Forcing an async boundary after your source should do the trick.

    Also note that you don't need a GraphDSL here, everything can be run compactly using the via/to DSL.

    Solution attempt below

    object KillSwitchMinimalMain extends App {
      val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java"))
    
      System.load(libopencv_java(0))
      implicit val actorSystem = ActorSystem()
      implicit val materializer = ActorMaterializer()
    
      val videoFile = Video("Video.MOV")
    
      val killSwitch = KillSwitches.shared("switch")
      val matConversion = Flow[ByteString].map { _.utf8String }
    
      Source.fromGraph(new VideoSource())
        .async
        .via(killSwitch.flow)
        .via(matConversion)
        .runForeach(println)
    
      Thread.sleep(200)
    
      killSwitch.shutdown()
    }
    

    For more info on the concurrency model underlying Akka Streams you can read this blogpost.