Search code examples
javaspringmongodbproject-reactorreactive-kafka

Limit the throughput of a Reactor Flux reading a Mongodb collection


I am using Spring 5, in detail the Reactor project, to read information from a huge Mongo collection to a Kafka topic. Unfortunately, the production of Kafka messages is much faster than the program that consumes them. So, I need to implement some backpressure mechanism.

Suppose I want a throughput of 100 messages every second. Googling a little, I decided to combine the feature of the buffer(int maxSize) method, zipping the result with a Flux that emits a message using a predefined interval.

 // Create a clock that emits an event every second
 final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
 // Create a buffered producer
 final Flux<ProducerRecord<String, Data>> outbound =
            repository.findAll()
                      .map(this::buildData)
                      .map(this::createKafkaMessage)
                      .buffer(100)
                      // Limiting the emission in time interval
                      .zipWith(clock, (msgs, tick) -> msgs)
                      .flatMap(Flux::fromIterable);
 // Subscribe a Kafka sender
 kafkaSender.createOutbound()
            .send(outbound)
            .then()
            .block();

Is there a smarter way to do this? I mean, it seems to me a little bit complex (the zip part, overall).


Solution

  • Yes, you can use delayElements(Duration.ofSeconds(1)) operation directily whitout need to zipWith it. There is always enhancement with reactor cool project as it a continious upgrades so let us be sticky :) hope was helpful!