Search code examples
scalascala-catscats-effecthttp4sfs2

Compose stream of effects for http response


I want to make a streaming JSON response via HTTP. The purpose is to send current time in a given city every second.

TL;DR: I need help with sending a result in effect F and a function, which returns this effect, should be called every 1 second. Is there a simple way you know of?

I've already tried several approaches and all of them do not work.

  1. This is the first one:
       def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
         val dsl = new Http4sDsl[F]{}
         import dsl._
    
         HttpRoutes.of[F] {
           case GET -> Root / "streaming" / city =>
    
             val throttling = Stream.awakeEvery[F](1.second)
    
             for {
               timeOrErr <- times.get(city.toUpperCase)
               resp <- timeOrErr match {
                 case Right(time) => Ok(throttling.map(_ => time.asJson))
                 case Left(error) => BadRequest(throttling.map(_ => error.asJson))
               }
             } yield resp
         }
       }

Here, my times.get(city.toUpperCase) function has the following signature:

    def get(city: String): F[Either[Times.CurrentTimeError, Times.CurrentTime]]

, where CurrentTimeError and CurrentTime are my custom case classes.The problem is that I get time only once in timeOrErr <- times.get(city.toUpperCase) line. So, every second it sends absolutely identical value (like, 2:43:31, 2:43:31, etc. And I want it to be 2:43:31, 2:43:32, etc.). And I have no idea how to make this function being called every second.

  1. Also, I tried to use a slightly different technique (and many others similar to this one):
       def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
         val dsl = new Http4sDsl[F]{}
         import dsl._
    
         HttpRoutes.of[F] {
           case GET -> Root / "streaming" / city =>
    
             val throttling = Stream.awakeEvery[F](1.second)
             val payload    = Stream(
               for {
                 timeOrErr <- times.get(city.toUpperCase)
                 resp      <- timeOrErr match {
                   case Right(time) => Ok(time.asJson)
                   case Left(error) => BadRequest(error.asJson)
                 }
               } yield resp
             )
    
             val stream = throttling.zipRight(payload)
    
             Ok(stream)
         }
       }

The problem here is the hell with nested monads. stream has the Stream[F, F[Response[F]]] type. And I can't make it a proper F[Response[F]] because fs2 Stream does not provide functions like sequence or traverse. If I try to return Ok(stream), then Circe cannot serialize F because it's abstract, so it is not even compiled.

  1. The 3rd approach is:
       HttpRoutes.of[F] {
         case GET -> Root / "streaming" / city =>
    
           val throttling = Stream.awakeEvery[F](1.second)
           val payload    = Stream(
             for {
               timeOrErr <- times.get(city.toUpperCase)
               resp      <- timeOrErr match {
                 case Right(time) => time.asJson
                 case Left(error) => error.asJson
               }
             } yield resp
           )
    
           val stream = throttling.map(_ => payload)
            
           Ok(stream)
       }

Well, number 3 is not compiled either. Primarily because I can't compose monads in payload. That is, case Right(time) => time.asJson and case Left(error) => error.asJson must be something like case Right(time) => SomethingThatcanBeUsedAsALastWrapperInThisForComprehension(time.asJson).

Unfortunately, official docs has little info about it. I'll be glad to hear any suggestions!


Solution

  • For what I could understand you want something like this:

    import cats.effect.{ContextShift, IO, Timer}
    import fs2.Stream
    import io.circe.Encoder
    import io.circe.generic.semiauto.deriveEncoder
    import org.http4s.{EntityEncoder, HttpRoutes}
    import org.http4s.circe.streamJsonArrayEncoderOf
    import org.http4s.dsl.Http4sDsl
    
    import scala.concurrent.duration._ // Provides the second extension method.
    
    type Error = String
    final case class Time(data: Long)
    
    trait Times {
      def get(cityName: String): IO[Either[Error, Time]]
    }
    
    object MyService extends Http4sDsl[IO] {
      // JSON Encoders.
      private implicit final val CirceEncoder: Encoder[Time] =
        deriveEncoder
      private implicit final val timeOrErrorCirceEncoder: Encoder[Either[Error, Time]] =
        Encoder.either[Error, Time](leftKey = "error", rightKey = "time")
      private implicit final val timeOrErrorEntityEncoder: EntityEncoder[IO, Stream[IO, Either[Error, Time]] =
        streamJsonArrayEncoderOf
      
      /** Returns the service itself. */
      def apply(times: Times)
               (implicit ev1: ContextShift[IO], ev2: Timer[IO]):HttpRoutes[IO] = HttpRoutes.of[IO] {
        Root / "streaming" / city =>
          Ok(Stream.awakeEvery[IO](1.second).evalMap(_ => times.get(city.toUpperCase)))
      }
    }