Search code examples
javareactive-programmingspring-webfluxproject-reactor

How to sum multiple values in a Flux?


I have expenses data. I need a model that includes the total expense amount, total paid expense amount, and expenses list. For this purpose, I wrote a function named createExpenseSummaryReport.

List<ExpenseModel> expensesModel = Arrays.asList(
        new ExpenseModel("Expense1", 121.687, 31.89),
        new ExpenseModel("Expense2", 78.03, 72.5),
        new ExpenseModel("Expense3", 102.014, 102.014),
        new ExpenseModel("Expense4", 100.0, 45.42)
    );
 
....
 public static ExpenseReportSummary createExpenseSummaryReport(List<ExpenseModel> expensesModel) {
    BigDecimal totalExpense = BigDecimal.ZERO;
    BigDecimal totalPaidExpense = BigDecimal.ZERO;
    for (ExpenseModel expense : expensesModel) {
      totalExpense = totalExpense.add(BigDecimal.valueOf(expense.getAmount()));
      totalPaidExpense =totalPaidExpense.add(BigDecimal.valueOf(expense.getPaidAmount()));
    }
    return new ExpenseReportSummary(expensesModel, totalExpense, totalPaidExpense);
  }

In a real project, I have a repository named IncomeExpenseRepository. This repository has this method that returns Flux income expenses.

Flux<IncomeExpense> byDateContaining = incomeExpenseRepository.findAllByDateContaining(
        request.getPeriod());

I need a createExpenseSummaryReport service in a reactive way. createExpenseSummaryReport includes income-expense list, total income amount, and total paid element. For this purpose, I am trying to complete the below function.

@Override
public Mono<IncomeExpenseSummaryReport> getIncomeExpenseReportByPeriod(
GetIncomeExpenseReportRequest request) {

    AtomicReference<BigDecimal> totalExpenseAmount = new AtomicReference<>(BigDecimal.ZERO);
    AtomicReference<BigDecimal> totalPaidExpenseAmount = new AtomicReference<>(BigDecimal.ZERO);
    
    incomeExpenseRepository.findAllByDateContaining(
            request.getPeriod())
        .doOnEach(d -> {
          IncomeExpense incomeExpense = d.get();
          totalExpenseAmount.getAndSet(BigDecimal.valueOf(incomeExpense.getAmount()));
          if ("paid".equals(incomeExpense.getPaymentStatus())) {
            totalPaidExpenseAmount.getAndSet(BigDecimal.valueOf(incomeExpense.getAmount()));
          }
        });


    return Flux.empty();

  }


Solution

  • The expected way to do that is to actually use the Flux#reduce operator:

    Flux.just(new ExpenseModel("Expense1", 121.687, 31.89),
            new ExpenseModel("Expense2", 78.03, 72.5),
            new ExpenseModel("Expense3", 102.014, 102.014),
            new ExpenseModel("Expense4", 100.0, 45.42))
        .reduce(new ExpenseReportSummary(BigDecimal.ZERO, BigDecimal.ZERO), ((expenseReportSummary, expenseModel) -> {
            expenseReportSummary.setTotalExpense(
                expenseReportSummary.getTotalExpense().add(BigDecimal.valueOf(expenseModel.getAmount())));
            expenseReportSummary.setTotalPaidExpense(
                expenseReportSummary.getTotalPaidExpense().add(BigDecimal.valueOf(expenseModel.getPaidAmount())));
            return expenseReportSummary;
        }))
        .doOnNext(System.out::println)
    

    The result is a Mono of ExpenseReportSummary.

    Output:

    ExpenseReportSummary(totalExpense=401.731, totalPaidExpense=251.824)