Search code examples
scalahttp4scats-effect

"Spawn" concurrent effect in a WebSocket endpoint


I have the following code:

class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
  var queue = Queue.bounded[F, String](100)

  def createService(queue: Queue[F, String]): F[Unit] = ???

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      // How to "spawn" createService?

      toClientF.flatMap { toClient =>
        WebSocketBuilder[F].build(toClient, fromClient)
      }
  }
}

createService is a function which creates a new service. Creating a new service is a very complicated process, it envolves triggering CI pipelines, waiting for them to finish and then trigger more CI pipelines in the same fashion. The queue it receives will be used to report back to the browser the current operations being performed.

I wanna concurrently "spawn" the createService and let it run until it finishes. However at the same time I want to immediately return the WebSocket to the client. Aka I cannot block while "spawning" createService.

I'm stuck. I can only think of using shift but that would mean the next line in the for comprehension would block waiting for createService to finish only to then return the websocket to the client.

Is my approach wrong? What am I doing wrong?


Solution

  • Since F is an instance of ConcurrentEffect, you also have a Concurrent instance.

    You can therefore use Concurrent[F].start which returns a Fiber to the running operation (you can just ignore the Fibre if you don't need to cancel/ensure completion though).

      val service: HttpRoutes[F] = HttpRoutes.of[F] {
        case PUT -> Root / "services" =>
          val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
          val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
            case Text(t, _) => F.delay(println(t))
            case f => F.delay(println(s"Unknown type: $f"))
          }
    
          for {
            toClient <- toClientF
            _ <- Concurrent[F].start(createService)
            websocket <- WebSocketBuilder[F].build(toClient, fromClient)
          } yield websocket
      }