Search code examples
javareactive-programmingspring-webfluxproject-reactorspring-data-r2dbc

Is processing all the rows from a table as Flux blocking or locking in R2DB?


Is the following construct blocking or locking in any possible way? Is it a proper way of using R2DB? If not, how else to process all the records of a table in a reactive way?

My concern is what is happening with the database connection before the whole flux is consumed, with the table, and with the Reactor threads. If I am blocking the thread, or if I keep the table locked, or the database connection blocked.

The purpose is to write a batch processing of all the rows from a table. For each row I want to perform an activity, which consists of fetching data from an external web service and in the end overwriting the original invoice in the very same table.

The assumption is that the single-row-processing method (in this example recalculateInvoice()) is fully reactive.

import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import lombok.RequiredArgsConstructor;

@Repository
@RequiredArgsConstructor
public class DbConnector {
    private final DatabaseClient databaseClient;

    public Mono<Void> batchProcessing() {
        return databaseClient
                .sql("SELECT * FROM invoice")
                .fetch()
                .all()
                .delayElements(Duration.ofSeconds(10)) 
                // a costly operation with the data ...
                .then()
        ;
    }
}


Solution

  • Your code snippet is fully reactive. It does not block any thread or the database. This is the whole idea of using R2DBC.

    Under the hood, something like this will be performed by the R2DBC driver:

    Flux.usingWhen(connectionFactory.create(),
            connection -> 
                Flux.from(connection.createStatement("SELECT * FROM invoice").execute()),
            Connection::close);
    

    The database connection will be closed as soon as the pipeline completes or an error occurs.

    You could read here about how backpressure is applied on such queries.