Search code examples
spring-webfluxproject-reactorreactorreactive-streams

How to iterate an object inside a Flux and do an operation on it?


I'm using project reactor and I'd like to perform the following:

    @Override
    public void run(ApplicationArguments args) {
        Flux.from(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(this::getObject)
                        .flatMap(this::iterateElasticWrites)
                        .flatMap(this::writeTheWholeObjectToS3)
        ).subscribe();
    }

    // What I'd like to do - but a non reactive code
    private Publisher<MyObj> iterateElasticWrites(MyObj message) {
        for (MyDoc file: message.getDocs()) {
            writeElasticDoc(file.getText());
        }
        return Mono.just(message);
    }

I'm struggling in finding out the equivalent of the iterateElasticWrites in Project Reactor. I'd like to perform an iteration of an object of mine (MyObj), and write each of its documents list's element into elasticsearch reactively.


Solution

  • In Reactor you always need to construct a reactive flow using different operators and all reactive/async code should return Mono or Flux.

    Looking at your example it could look like

    private Mono<MyObj> iterateElasticWrites(MyObj message) {
        return Flux.fromIterable(message.getDocs())
                .flatMap(doc -> writeElasticDoc(doc.getText()))
                .thenReturn(message);
    }
    

    where writeElasticDoc could be defined as

    private Mono<Void> writeElasticDoc(String text) {
        ...
    }