Search code examples
akkaakka-streamakka-kafka

Is this Akka Kafka Stream configuration benefits from Back Pressure mechanism of the Akka Streams?


we have an Akka Application that consume from an Kafka Topic and send the received Message to an Akka Actor. I am not sure that way I programmed used the all benefits of the Back Pressure mechanism built into Akka Streams.

Following is my configuration...

val control : Consumer.DrainingControl[Done]
Consumer
 .sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
 .map(consumerRecord =>
     val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
     
     val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
     
     myActor ! Update(myAvro)          
 )
 .via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
 .toMat(Sink.ignore)(Consumer.DrainControl.apply)
 .run()

This does what I expect as my Business Case, myActor receive the Commands Update(MyAvro)

I am more irritated with the technical concepts of Back Pressure, as much as I can understand, Back Pressure mechanism are controlled partially from Sinks but in this Stream configuration, my Sink is only 'Sink.ignore'. So my Sink is doing anything for Back Pressure.

What I am also curious when Akka Kafka Stream commit the Kafka Topic offset? The moment the Command delivered to Mailbox of MyActor? If so then how I can handle scenarios like ask patterns, Kafka Offset should not commit until ask pattern completes.

I see some Factory Methods dealing with manual offset control 'plainPartitionedManualOffsetSource', 'commitablePartitionManualOffsetSource' but I can't find any example for those, can I decide with my business logic to manually commit the offsets?

As an alternative Configuration, I can use something like this.

val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics("myTopic"))
    .toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
    .run()

Now I have an access to Sink.actorRef, I think Back Pressure mechanism has a chance control Back Pressure, Naturally this code will not work because I have no idea how can I access 'myAvro' under this constellation.

Thx for answers..


Solution

  • In the first stream, there will be basically no backpressure. The offset commit will happen very soon after the message gets sent to myActor.

    For backpressure, you'll want to wait for a response from the target actor, and as you say, the ask pattern is the canonical way to accomplish that. Since an ask of an actor from outside an actor (which for all intents and purposes a stream is outside of an actor: that the stages are executed by actors is an implementation detail) results in a Future, this suggests that mapAsync is called for.

    def askUpdate(m: MyAvro): Future[Response] = ???  // get actorref from cluster sharding, send ask, etc.
    

    You would then replace the map in your original stream with

    .mapAsync(parallelism) { consumerRecord =>
      askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
    }
    

    mapAsync limits the "in-flight" futures to parallelism. If there are parallelism futures (spawned by it, of course), it will backpressure. If a spawned future completes with a failure (for the ask itself, this will generally be a timeout), it will fail; the results (respecting incoming order) of the successful futures will be passed on (very often, these will be akka.Done, especially when the only things left to do in the stream are offset commit and Sink.ignore).