Search code examples
spring-bootjdbcreactive-programmingspring-webfluxproject-reactor

How to Save Multiple Records using Web flux and JDBC?


I am trying to build a simple web application using spring boot - webflux (functional endpoints) & jdbc. The app receives payload in XML format (which is some details of 1 employee). Code given below persists data for one employee as expected.

public Mono<String> createData(final Mono<Record> inputMono) {
    final String someID = UUID.randomUUID().toString();

    final Mono<Integer> asyncUpdate = inputMono.flatMap(record -> {
        return beginUpdate(dataSource, 
  sqlStatementSharedAbove).withStatementEnricher(stmt -> {
            stmt.setString(1, record.getFirstName());
            stmt.setString(2, record.getLastName());
            stmt.setInt(3, record.getAddress());
        }).build();

    });
    return asyncUpdate.doOnSuccess(affectedRows -> LOGGER.debug("Added 
 {} rows with ID {}", affectedRows, someID))
        .map(affectedRows -> someID);
}

Now I need to save similar data for multiple employees (modifying the XML payload to contain multiple employee records)

In non-webflux world, I would just iterate over the list of employee objects and call this function for each one of them.

How can I achieve the same in webflux? Essentially I am looking to handle a saveAll functionality with webflux and given that I have to work with JDBC (I do understand that JDBC doesn't support non blocking paradigm and Mongo supports a saveAll API but I have certain constraints as to what DB i can use and therefore must make this work with JDBC)

Thank you.


Solution

  • Following code works to save multiple employee records. Essentially it needs a Flux (of Employees) to work with -

    
        public Mono<Void> createData(final Flux<Record> inputFlux) {
    
        return inputFlux.flatMap(record -> {
                return beginUpdate(dataSource, 
          sqlStatementSharedAbove).withStatementEnricher(stmt -> {
                    stmt.setString(1, record.getFirstName());
                    stmt.setString(2, record.getLastName());
                    stmt.setInt(3, record.getAddress());
                }).build().doOnSuccess(affectedRows -> LOGGER.info("Added rows{}", affectedRows));
    
            }).then;
        }