Search code examples
javaspring-webfluxreactive-streams

Aggregating result of two Flux into Mono without blocking


I have the domain structure as follows

package org.example;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.List;

@Getter
@Builder
@ToString
public class Payments {

    private List<SuccessAccount> successAccounts;
    private List<FailedAccount> failedAccounts;

    @Getter
    @Builder
    @ToString
    public static class SuccessAccount {
        private String name;
        private String accountNumber;
    }

    @Getter
    @Builder
    @ToString
    public static class FailedAccount {
        private String name;
        private String accountNumber;
        private String errorCode;
    }
}

I get the the accounts failed and accounts which passed from two different methods , After i receive both failed and passed account i want to create a Payments object without blocking. I tried the following but as per docs calling subscribe on collectList is blocking call. Can someone help how to make it work without blocking? It is possible to call subscribe method in main method, but this is an simple example of what i am trying to solve .In my actual case i get failed and success from same webservice call. I need to return an object with both success and failure as Response in rest controller

package org.example;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        getPaymentData().subscribe(System.out::println);
    }

    public static Mono<Payments> getPaymentData() {

        Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
        Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();

        List<Payments.SuccessAccount> successAccounts = new ArrayList<>();

        List<Payments.FailedAccount> failedAccounts = new ArrayList<>();

        accountsFailed.collectList().subscribe(failedAccounts::addAll);// This is blocking call as per docs

        accountsSucceeded.collectList().subscribe(successAccounts::addAll);// This is blocking call as per docs

        return Mono.just(Payments.builder()
                .failedAccounts(failedAccounts)
                .successAccounts(successAccounts)
                .build());
    }
    
    public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
        return Flux.just(Payments.SuccessAccount.builder()
                        .accountNumber("1234345")
                        .name("Payee1")
                        .build(),
                Payments.SuccessAccount.builder()
                        .accountNumber("83673674")
                        .name("Payee2")
                        .build());
    }
    
    public static Flux<Payments.FailedAccount> getAccountsFailed() {
        return Flux.just(Payments.FailedAccount.builder()
                        .accountNumber("12234345")
                        .name("Payee3")
                        .errorCode("8938")
                        .build(),
                Payments.FailedAccount.builder()
                        .accountNumber("3342343")
                        .name("Payee4")
                        .errorCode("8938")
                        .build());
    }
}```

Solution

  • What you want is called a zip operation.

    In your case, you can use Mono.zipWith over your accounts:

    Mono<Payments> combined = accountsFailed.collectList().zipWith(
            accountsSucceeded.collectList(), 
            (failedAccounts, successAccounts) 
                    -> Payments.builder()
                            .failedAccounts(failedAccounts)
                            .successAccounts(successAccounts)
                            .build()
    );