Search code examples
akka-streamakka-kafka

What is a good pattern for committing Kafka consumer offset after processing message?


I am using Akka Streams Kafka to pipe Kafka messages to a remote service. I want to guarantee that the service receives every message exactly once (at-least-once AND at-most-once delivery).

Here's the code I came up with:

  private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                           topicPattern: String,
                           mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))
      .map(message => (message, mapCommittableMessageToSinkMessage(message)))
      .mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)
  }

As the code shows, it maps tuples of the original message, as well as the transformed messages passed to the subscriber (an actor that sends to remote service). The purpose of the tuple is to commit the offset after the subscriber completes processing.

Something about it just seems like an anti-pattern, but I'm not sure a better way to do it. Any suggestions on a better way?

Thanks!


Solution

  • One way to keep this a cleaner and easier to change could be by using the GraphDSL. It would allow you to spawn a branch of you graph carrying over the Committable part of your message, whilst another branch can perform all the needed business logic.

    An example of graph could be (omitting all the boilerplate for better clarity):

    val src = Consumer.committableSource(consumerSettings, Subscriptions
          .topicPattern(topicPattern))
    
    val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))
    
    val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
          .runWith(Sink.ignore)  // look into Sink.foldAsync for a more compact re-write of this part
    
    src ~> broadcast
           broadcast ~> businessLogic ~> zip.in0
           broadcast         ~>          zip.in1
                                         zip.out.map(_._2) ~> snk