Search code examples
javaakka-streamalpakka

Adjusting parallism based on number of partitions assigned in Consumer.committablePartitionedSource


I am trying to use Consumer.committablePartitionedSource() and creating stream per partition as shown below

    public void setup() {
        control = Consumer.committablePartitionedSource(consumerSettings,
                Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
                .mapAsyncUnordered(Integer.MAX_VALUE, pair -> setupSource(pair, committerSettings))
                .toMat(Sink.ignore(), Consumer::createDrainingControl)
                .run(Materializer.matFromSystem(actorSystem));
    }

    private CompletionStage<Done> setupSource(Pair<TopicPartition, Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed>> pair, CommitterSettings committerSettings) {
        LOGGER.info("SETTING UP PARTITION-{} SOURCE", pair.first().partition());
        return pair.second().mapAsync(16, msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg), actorSystem.dispatcher())
                .thenApply(param -> msg.committableOffset()))
                .withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
                .runWith(Committer.sink(committerSettings), Materializer.matFromSystem(actorSystem));
    }

While setting up the source per partition I am using parallelism which I want to change based on no of partitions assigned to the node. That I can do that in the first assignment of partitions to the node. But as new nodes join the cluster assigned partitions are revoked and assigned. This time stream not emitting already existing partitions(due to kafka cooperative rebalancing protocol) to reconfigure parallelism.

Here I am sharing the same dispatcher across all sources and if I keep the same parallelism on rebalancing I feel the fair chance to each partition message processing is not possible. Am I correct? Please correct me


Solution

  • If I understand you correctly you want to have a fixed parallelism across dynamically changing number of Sources that come and go as Kafka is rebalancing topic partitions.

    Have a look at first example in the Alpakka Kafka documentation here. It can be adjusted to your example like this:

     Consumer.DrainingControl<Done> control =
          Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics("chat"))
                  .wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE", p.first().partition()))
                  .flatMapMerge(Integer.MAX_VALUE, Pair::second)
                  .mapAsync(
                    16,
                    msg -> CompletableFuture
                             .supplyAsync(() -> consumeMessage(msg),
                                          actorSystem.dispatcher())
                             .thenApply(param -> msg.committableOffset()))
                  .withAttributes(
                    ActorAttributes.supervisionStrategy(
                      ex -> Supervision.restart()))
                  .toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
                  .run(Materializer.matFromSystem(actorSystem));
    

    So basically the Consumer.committablePartitionedSource() will emit a Source anytime Kafka assigns partition to this consumer and will terminate such Source when previously assigned partition is rebalanced and taken away from this consumer.

    The flatMapMerge will take those Sources and merge the messages they output.

    All those messages will compete in the mapAsync stage to get processed. The fairness of this competing is really down to the flatMapMerge above that should give equal chance for all the Sources to emit their messages. Regardless of how many Sources are outputing messages, they will all share a fixed parallelism here, which I believe is what you're after.

    All those messages eventually get to the Commiter.sink that handles offset committing.