Search code examples
scalascala-catsfs2cats-effect

FS2 passing resource (or effect) as a state


I'm trying to implement an application that controls a camera. Camera commands are represented as a stream of CameraAction objects:

sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage

...

val s = Stream[F, CameraMessage]

Let's say I have a test stream that emits "Record" and emits "Stop" 20 seconds later, after another 20 seconds another "Record" message is emitted and so on, the input stream is infinite.

Then the app consumes "Record" it should create an instance of GStreamer pipeline (i.e. it is an effect) and "run" it, on "Stop" it 'stops' the pipeline and closes it. Then on subsequent "Record" the pattern is repeated with new GStreamer pipeline.

The problem is that I need to pass an instance of impure, mutable object between handles of stream events.

FS2 documentation suggest to use chunks to make a stream stateful, so I tried


def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] = 
{ 
... create and open pipeline ... 
}

def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
 ... stop pipeline, release resources ... 
}

def effectPipe(pipelineDef: String)(implicit L: Logger[F]): 
Pipe[F, CameraMessage, F[Unit]] = {
    type CameraSessionHandle = Pipeline
    type CameraStream = Stream[F, CameraSessionHandle]

    s: Stream[F, CameraMessage] =>
      s.scanChunks(Stream[F, CameraSessionHandle]()) {
        case (s: CameraStream, c: Chunk[CameraMessage]) =>
          c.last match {
            case Some(Record(fileName)) =>
              (Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
            case Some(StopRecording) =>
              (Stream.empty, Chunk(s.compile.drain))
            case _ =>
              (s, Chunk.empty)
          }
      }
  }

The problem with this code that actual recording does not happen on 'Record' event but rather then the effect of the whole chunk is evaluated, i.e. when 'StopRecording' message arrives the camera is turned on and then immediately turned off again.

How can I pass a "state" without chunking? Or is there any other way to achieve the result I need?

This may be similar to FS2 Stream with StateT[IO, _, _], periodically dumping state but the difference is that the state in my case is not a pure data structure but a resource.


Solution

  • I eventually was able so solve it using Mutable Reference pattern as described in https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html

    Here is the code:

    import cats.effect._
    import cats.syntax.all._
    import fs2.Stream
    
    import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
    import scala.language.higherKinds
    
    class FRef[F[_], T](implicit sync: Sync[F]) {
      private var state: T = _
      def set(n: T): F[Unit] = sync.delay(this.state = n)
      def get: F[T] = sync.pure(state)
    }
    
    object FRef {
      def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
    }
    
    class CameraImpl(id: String) extends Camera {
    
      override def record(): Unit = {
        println(s"Recording $id")
      }
    
      override def stop(): Unit = {
        println(s"Stopping $id")
      }
    
      override def free(): Unit = {
        Thread.sleep(500)
        println(s"Freeing $id")
      }
    }
    
    object Camera {
      def apply(id: String) = new CameraImpl(id)
    }
    
    trait Camera {
      def record(): Unit
      def stop(): Unit
      def free(): Unit
    }
    
    sealed trait CameraMessage
    case class Record(recordId: String) extends CameraMessage
    case object StopRecording extends CameraMessage
    
    class Streamer[F[_]](implicit sync: Sync[F]) {
    
      def record(id: String): F[Camera] = {
        sync.delay {
          val r = Camera(id)
          r.record()
          r
        }
      }
    
      def stopRecording(pipe: Camera): F[Unit] = {
        sync.delay {
          pipe.stop()
          pipe.free()
        }
      }
    
      def effectPipe(state: FRef[F, Option[Camera]])(
          implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
        type CameraStream = Stream[F, Camera]
    
        s: Stream[F, CameraMessage] =>
          s.evalMap {
            case Record(fileName) =>
              for {
                r <- record(fileName)
                _ <- state.set(Some(r))
              } yield ()
            case StopRecording =>
              for {
                s <- state.get
                _ <- stopRecording(s.get)
                _ <- state.set(None)
              } yield ()
          }
      }
    }
    
    object FS2Problem extends IOApp {
      import scala.concurrent.duration._
    
      override def run(args: List[String]): IO[ExitCode] = {
    
        implicit val ec: ExecutionContextExecutor = ExecutionContext.global
    
        val streamer = new Streamer[IO]
    
        val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
          case (_, idx) =>
            idx % 2 match {
              case 0 =>
                Record(s"Record $idx")
              case _ =>
                StopRecording
            }
        }
    
        val ss = for {
          streamerState <- Stream.eval(FRef[IO, Option[Camera]])
          s <- s.through(streamer.effectPipe(streamerState))
        } yield ()
    
        ss.compile.drain.map(_ => ExitCode.Success)
      }
    }