I have a service which I want to make @Transactional
. My service is storing complex data in multiple tables, there is a referential integrity between the tables.
public class MyData { // simplified
MasterData masterData;
List<Detail> details;
public static class MasterData {
UUID id;
String field1;
String field2;
String field3;
}
public static class Detail {
UUID masterId;
String fieldA;
String fieldB;
}
}
R2dbcRepository<MasterData, UUID>
. The INSERT
command is simple and I may use the R2dbcRepository
.DatabaseClient
. Each detail has a foreign key constraint to the master table. I want to use batch INSERT
and I complete the SQL using more complex approach in DatabaseClient
.The problem is that I cannot save the detail data - I get the error
insert or update on table "detail" violates foreign key constraint
I suspect that the reason is that each SQL command is executed in a different connection so the master data are not yet visible when the details are stored.
Is it really the root cause? How to make R2DBC always use the same connection across all the calls to the database inside one @Transactional
service call, even if it goes via various instances of R2dbcRepository
and DatabaseClient
?
If the solution is completely wrong, how to correctly implement @Transactional
in R2DBC?
I prefer calling all the INSERT
s into the detail
table in a batch.
My (simplified) code looks like this:
@Service
public class MyService {
private final MasterRepository masterRepository;
private final DbConnector dbConnector;
@Transactional
public Mono<Void> saveMasterAndDetails(MyData data) {
return Mono.just(data)
.map(MyData::getMaster)
.flatMap(masterRepository::insertMasterData)
.thenReturn(data)
.map(MyData::getDetails)
.flatMap(dbConnector::insertDetails)
.then()
;
}
}
The code of MasterRepository
is something like
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface MasterRepository extends R2dbcRepository<MasterData, UUID> {
@Query("""
INSERT INTO master(id, col_1, col_2, col_3)
VALUES (
:#{#masterData.id},
:#{#masterData.field1},
:#{#masterData.field2},
:#{#masterData.field3})
""")
Mono<Void> insertMasterData(MasterData masterData);
}
And the code of DbConnector
is more complex - but maybe overly complex? There is still missing direct support for batches and prepared statements in DatabaseClient
: spring-data-r2dbc-259, spring-framework-27229
import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Integer> insertDetails(List<Detail> details) {
// usingWhen() is the reactive analogy of "use-with-resources"
return Flux.usingWhen(
// "try(Create the resource)"
databaseClient.getConnectionFactory().create(),
// "{ the body }"
connection -> {
final Statement statement = connection.createStatement("""
insert into detail (masterId, col_A, col_B)
values ($1, $2, $3)
""");
details.forEach(detail ->
statement
.bind("$1", detail.getMasterId())
.bind("$2", detail.getColA())
.bind("$3", detail.getColB())
.add()
);
return statement.execute();
},
// "finally close()"
Connection::close)
.flatMap(Result::getRowsUpdated)
.reduce(0, Integer::sum);
}
}
I replaced
return Flux.usingWhen(
databaseClient.getConnectionFactory().create(),
connection -> {
statement = ... // prepare the statement
return statement.execute();
},
Connection::close
)
with
return databaseClient.inConnection(connection -> {
statement = ... // prepare the statement
return statement.execute();
);
I have discovered a method which I was not aware of: DatabaseConnection.inConnection(). The DatabaseConnection
interface inherits it from ConnectionAccessor
:
Execute a callback Function within a Connection scope. The function is responsible for creating a Mono. The connection is released after the Mono terminates (or the subscription is cancelled). Connection resources must not be passed outside of the Function closure, otherwise resources may get defunct.
I changed my code using the DatabaseClient
and it seems that the SQL commands are executed in the same connection.
However, I would still like to understand it better. I am not sure, if I am just lucky now and if it can change with the next implementation. I still do not know how to have the full control over the connections and hence over the transactional code.
import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Integer> insertDetails(List<Detail> details) {
return databaseClient.inConnection(connection -> {
final Statement statement = connection.createStatement("""
insert into detail (masterId, col_A, col_B)
values ($1, $2, $3)
""");
details.forEach(detail ->
statement
.bind("$1", detail.getMasterId())
.bind("$2", detail.getColA())
.bind("$3", detail.getColB())
.add()
);
return Flux.from(statement.execute())
.flatMap(Result::getRowsUpdated)
.reduce(0, Integer::sum)
;
});
}