Search code examples
playframeworkapache-kafkaakka-streamakka-kafka

Getting last message from kafka Topic using akka-stream-kafka when connecting with websocket


Is it at all possible to get the last message on a Kafka Topic using Akka Streams Kafka? I'm creating a websocket which listens to a Kafka Topic, but currently it retrieves all prior unred messages when I connecting. This can add up to quite a lot of messages, so I'm only interrested in the last message + any future messages. (or only future messages)

The source:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

Consumer settings:

  @Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)

  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

I've tried adding setting the ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

Which should "automatically reset the offset to the latest offset", but it does not seem to have any effect.


Solution

  • I was able to avoid getting any upstream data upon client connection using a method described very neatly by David van Geest here

    It boils down to having a BroadcastHub on the Consumer:

    val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
    .toMat(BroadcastHub.sink)(Keep.right)
    .run()
    

    And connecting a static consumer to eat all the upstream data

    liveSource.to(Sink.ignore).run()
    

    Onwards this lets me have a WebSocket client subscribe to all data recieved by the consumer as such:

    def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
    

    Or filter based on KafkaTopic (or whatever else you want)

    def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
      Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
        x =>
          (Json.parse(x) \ "topic").asOpt[String] match {
            case Some(str) => str.equals(kafkaTopic)
            case None => false
          }
      }))
    }
    

    This does not resolve the issue of giving x amount of data to the user when first connecting, but I foresee us adding a simple database query for any historic data, and let the WebSocket connection only focus on the livestreaming data.