Search code examples
springr2dbc

Can Spring R2DBC execute batch insert and update at the same time?


Can Spring R2BC save and update a batch at the same time?

I get a list of users (1 million rows from a file for example). Some of these users are new and I need to INSERT them to the table, and some need to be UPDATED due to changed data. It is not clear from the file which are new and which are not. I'm considering user_id as the primary key

How can I describe this logic in code using Spring R2DBC?


Solution

  • It a little depend how much work you would like to do :)

    Select then insert/update

    The first way is quite simple to understand, but suboptimal in case of number of queries/database interactions.

    @Transactional
    Mono<User> save(User user){
        return repository.findById(user.getId())
                        .flatMap(found -> repository.save(found.copyMutableValuesFrom(user)))
                        .switchIfEmpty(repository.save(user));  
    }
    

    In first step you try to find user by id, then update (with fields rewrite in copyMutableValuesFrom) or insert.

    Upsert

    Second way is to use custom query:

    interface UserRepository extends ReactiveCrudRepository<User, Long>  {
    
        @Modifying
        @Query("""
    INSERT INTO user (id, firstname, lastname) VALUES(1, :firstname, :lastname)
        ON DUPLICATE KEY 
        UPDATE firstname=:firstname, lastname=:lastname
    """)
        Mono<Integer> maybeInsertMaybeUpdate(Long id, String firstname, String lastname);
    
    }
    

    This way limit number of queries, but strongly depends on database. Above query is for mySQL, but postgres version looks like:

    INSERT INTO user (id, firstname, lastname) VALUES(1, :firstname, :lastname)
       ON CONFLICT(id)
       DO
       UPDATE SET firstname=:firstname, lastname=:lastname
    

    Batch

    As in comment. You need to use construction like this:

       Flux<Long> insertProductColors(List<User> users){
            if (users.isEmpty()) return Flux.empty();
            return databaseClient.inConnectionMany { connection ->
                val statement = connection.createStatement(insertOrUpdateUserQuery)
                users.forEach(user ->
                    statement.bind(0, user.id).bind(1, user.firstName, user.lastname).add()
                );
                return statement.execute().toFlux().flatMap( result ->
                    result.map ( row, meta -> row.get("id", Long.class) )
                )
            }
        }
    

    I'm not sure everything here, because I get some of my code in kotlin and translated it here, but general idea is probably clear.