Search code examples
scalahttp4sfs2

How to shutdown a fs2.StreamApp programmatically?


Extending StreamApp asks you to provide the stream def. It has a requestShutdown parameter.

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

I provide the implementation for this and understand that args is passed in as command line arguments. I'm unsure however, what supplies the requestShutdown parameter and what I can do with it.

Specifically, I'd like to invoke a graceful shutdown on a Stream[IO, ExitCode] which is starting a Http4s server (which blocks forever).

It looks like a Signal is needed and must be set? The underlying stream that I'm trying to 'get at' looks like this:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

My stream def is here and StreamAppSpec from the fs2 project has something in the StreamAppSpec but I can't work out how I'd adapt it.


Solution

  • You can think of the requestShutdown parameter that is supplied to the stream function as meaning an action that, when executed, will request the termination of the program.

    Executing it will consequently result in it ending the program.

    Here is an example use:

      override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
        for {
          scheduler <- Scheduler[IO](corePoolSize = 1)
          exitStream = scheduler.sleep[IO](10 seconds)
           .evalMap(_ => requestShutdown)
           .map(_ => ExitCode.Success)
          serverStream = BlazeBuilder[IO]
            .bindHttp(port, "0.0.0.0")
            .mountService(services(scheduler), "/")
            .serve
          result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
        } yield result
    

    In this scenario, we create two streams:

    • The first will wait for 10 seconds before triggering the effect of
      terminating the app.

    • The second will run the http4s server.

    We then join these two streams so that they run concurrently meaning that the web server will run for 10 seconds before the other stream signals that the program should terminate.