Search code examples
scalahttp4sfs2

A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using http4s


i'm on my journey to deepen my knowledge in fs2, and want to try fs2-kafka for a use case where i would replace akka stream. The idea is simple, read from kafka and post data via http request to a sink, then commit back to kafka on success. So far i can't really figure out the http part. In akka stream / akka http you have out of the box a flow for that https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

Which integrate flawlessly with akka stream.

I was trying to see if i could do something similar with http4s and fs2 .

Does anyone has any reference, code sample, blog and what not that shows how to do that kind of integration. So far the only thing i could think of was, wrapping the the stream into the use method of the client resource i.e

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

Even then i am not sure of the entire thing


Solution

  • The thing with the typelevel ecosystem is that everything is just a library, you don't need examples on how many of them interact together, you just need to understand how each library works and the basic rules of composition.

    def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
      // Fill this based on the documentation of the client of your choice:
      // I would recommend the ember client from http4s:
      // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
    }
    
    
    def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
      // Fill this based on the documentation of your client:
      // https://http4s.org/v0.23/client/
      // https://http4s.org/v0.23/api/org/http4s/client/client
    }
    
    def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
      // Fill this based on the documentation of fs2-kafka:
      // https://fd4s.github.io/fs2-kafka/docs/consumers
    }
    
    def program(/** whatever arguments you need */): Stream[IO, Unit] = {
      // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
      Stream.fromResource(createClient(...)).flatMap { client =>
        getStreamOfRecords(...).evalMapFilter { committable =>
          sendHttpRequest(client)(data = committable.record).map { result =>
            if (result.isSuccess) Some(committable.offset)
            else None
          }
        }.through(commitBatchWithin(...))
      }
    }
    
    object Main extends IOApp.Simple {
      override final val run: IO[Unit] =
        program(...).compile.drain
    }
    

    Note that I wrote all this on top of my head and with just a quick glimpse of the documentation, you need to change many things (especially types, like Data & Result). As well as tunning things like error handling and when to commit back to Kafka.
    However, I expect this helps you to get an idea of how to structure your code.