Search code examples
scalascala-catsfs2http4smonix

Model multiple function calls with a stream (in a safe, FP way)


Given a function A => IO[B] (aka Kleisli[IO, A, B]) that is meant to be called multiple times, and has side effects, like updating a DB, how to delegate such multiple calls of it into a stream (I guess Pipe[IO, A, B]) (fs2, monix observable/iterant)? Reason for this is to be able to accumulate state, batch calls together over a time window etc.

More concretely, http4s server requires a Request => IO[Response], so I am looking how to operate on streams (for the above benefits), but ultimately provide such a function to http4s.

I suspect it will need some correlation ID behind the scenes and I am fine with that, I am more interested in how to do it safely and properly from an FP perspective.

Ultimately, the signature I expect is probably something like:

Pipe[IO, A, B] => (A => IO[B]), such that calls to Kleisli are piped through the pipe.

As an afterthought, would it be at all possible to backpressure?


Solution

  • One idea is to model it with MPSC (Multiple Publisher Single Consumer). I'll give an example with Monix since I'm more familiar with it, but the idea stays the same even if you use FS2.

    object MPSC extends App {
    
      sealed trait Event
      object Event {
        // You'll need a promise in order to send the response back to user
        case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
      }
    
      // For backpressure, take a look at `PublishSubject`.
      val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)
    
      def pushEvent(num: Int) = {
        for {
          promise <- Deferred[Task, Int]
          _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
        } yield promise
      }
    
      // You get a list of events now since it is buffered
      // Monix has a lot of buffer strategies, check the docs for more details
      def processEvents(items: Seq[Event]): Task[Unit] = {
        Task.delay(println(s"Items: $items")) >>
          Task.traverse(items) {
            case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
          }.void
      }
    
      val app = for {
        // Start the stream in the background
        _ <- cs
          .bufferTimed(3.seconds) // Buffer all events within 3 seconds
          .filter(_.nonEmpty)
          .mapEval(processEvents)
          .completedL
          .startAndForget
    
        _ <- Task.sleep(1.second)
        p1 <- pushEvent(10)
        p2 <- pushEvent(20)
        p3 <- pushEvent(30)
    
        // Wait for the promise to complete, you'll do this for each request
        x <- p1.get
        y <- p2.get
        z <- p3.get
    
        _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
      } yield ()
    
      app.runSyncUnsafe()
    }