Search code examples
scalawebsocketapache-kafkaakka-streamlagom

Create Source from Lagom/Akka Kafka Topic Subscriber for Websocket


I want my Lagom subscriber-only service to subscribe to a Kafka Topic and stream the messages to a websocket. I have a service defined as follows using this documentation (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) as a guideline:

    // service call
    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

    // service implementation
    override def stream() = ServiceCall { req =>
      req.runForeach(str => log.info(s"client: %str"))
      kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
        // add message to a Source and return Done
      ))
      Future.successful(//some Source[String, NotUsed])

However, I can't quite figure out how to handle my kafka message. The Flow.fromFunction returns [String, Done, _] and implies that I need to add those messages (strings) to a Source that has been created outside of the subscriber.

So my question is twofold: 1) How do I create an akka stream source to receive messages from a kafka topic subscriber at runtime? 2) How do I append kafka messages to said source while in the Flow?


Solution

  • You seem to be misunderstanding the service API of Lagom. If you're trying to materialize a stream from the body of your service call, there's no input to your call; i.e.,

    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
    

    implies that when the client provides a Source[String, NotUsed], the service will respond in kind. Your client is not directly providing this; therefore, your signature should likely be

    def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]
    

    Now to your question...

    This actually doesn't exist in the scala giter8 template, but the java version contains what they call an autonomous stream which does approximately what you want to do.

    In Scala, this code would look something like...

    override def autonomousStream(): ServiceCall[
      Source[String, NotUsed], 
      Source[String, NotUsed]
    ] = ServiceCall { hellos => Future {
        hellos.mapAsync(8, ...)
      }
    }
    

    Since your call isn't mapping over the input stream, but rather a kafka topic, you'll want to do something like this:

    override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall { 
      _ => 
        Future {
          kafkaTopic()
            .subscribe
            .atMostOnce
            .mapAsync(...)
        }
    }