Search code examples
project-reactorreactorreactor-kafka

Reactor Flux - Only emit from Publisher on completion


I have some Reactor Kafka code that reads in events via a KafkaReceiver and writes 1..many downstream messages via 1 or more KafkaSenders that are concatenated into a single Publisher. Everything is working great, but what I'd like to do is only emit an event from this concatenated senders Flux when it is complete (i.e. it's done writing to all downstream topics for any given event, so it does not emit anything for each element as it writes to Kafka downstream until it's done). This way I could sample() and periodically commit offsets, knowing that whenever it is that sample() happens to trigger and I commit offsets for an incoming event that I've processed all downstream messages for each event I'm committing offsets for. It seems like I could use either pauseUntilOther() or then() somehow, but I don't quite see exactly how given my code and specific use case. Any thoughts or suggestions appreciated, thanks.

Main Publisher code:

this.kafkaReceiver.receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
                partitionFlux.publishOn(this.scheduler)
                        .flatMap(this::processEvent)
                        .sample(Duration.ofMillis(10))
                        .concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
        .subscribe();

Concatenated KafkaSenders returned by call to processEvent():

return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);

Solution

  • Sounds like Flux.last() is what you are looking for:

    return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
        .last();
    

    Then your .sample(Duration.ofMillis(10)) would do whatever is available as a last item from one or several batches sent to those brokers. And in the end your commitReceiverOffset() would properly commit whatever was the last.

    See its JavaDocs for more info:

    /**
     * Emit the last element observed before complete signal as a {@link Mono}, or emit
     * {@link NoSuchElementException} error if the source was empty.
     * For a passive version use {@link #takeLast(int)}
     *
     * <p>
     * <img class="marble" src="doc-files/marbles/last.svg" alt="">
     *
     * <p><strong>Discard Support:</strong> This operator discards elements before the last.
     *
     * @return a {@link Mono} with the last value in this {@link Flux}
     */
    public final Mono<T> last() {
    

    and marble diagram: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg