I am trying to use Consumer.committablePartitionedSource()
and creating stream per partition as shown below
public void setup() {
control = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
.mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
}
private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
return pair.second().mapAsync(16, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
.runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
}
While setting up the source per partition I am using parallelism which I want to change based on no of partitions assigned to the node. That I can do that in the first assignment of partitions to the node. But as new nodes join the cluster assigned partitions are revoked and assigned. This time stream not emitting already existing partitions(due to kafka cooperative rebalancing protocol) to reconfigure parallelism.
Here I am sharing the same dispatcher across all sources and if I keep the same parallelism on rebalancing I feel the fair chance to each partition message processing is not possible. Am I correct? Please correct me
If I understand you correctly you want to have a fixed parallelism across dynamically changing number of Source
s that come and go as Kafka is rebalancing topic partitions.
Have a look at first example in the Alpakka Kafka documentation here. It can be adjusted to your example like this:
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("chat"))
.wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE", p.first().partition()))
.flatMapMerge(Integer.MAX_VALUE, Pair::second)
.mapAsync(
16,
msg -> CompletableFuture
.supplyAsync(() -> consumeMessage(msg),
actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(
ActorAttributes.supervisionStrategy(
ex -> Supervision.restart()))
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
So basically the Consumer.committablePartitionedSource()
will emit a Source
anytime Kafka assigns partition to this consumer and will terminate such Source
when previously assigned partition is rebalanced and taken away from this consumer.
The flatMapMerge
will take those Source
s and merge the messages they output.
All those messages will compete in the mapAsync
stage to get processed. The fairness of this competing is really down to the flatMapMerge
above that should give equal chance for all the Source
s to emit their messages. Regardless of how many Source
s are outputing messages, they will all share a fixed parallelism here, which I believe is what you're after.
All those messages eventually get to the Commiter.sink
that handles offset committing.