In a non-reactive app, you can simply get the connection from the DataSource
.
In Reactive, I found ConnectionFactory
, but it returns an org.reactivestreams.Publisher
. With that, it seems you can only subscribe()
an org.reactivestreams.Subscriber
.
When you get a publisher then you need to wrap it in either a Mono::from
or a Flux::from
in order to be able to subscribe to it. For a connection you should wrap it in a Mono::from
and cache it like so.
private final Mono<? extends Connection> connection;
public SomeEntityDao() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, H2_DRIVER)
.option(PASSWORD, "")
.option(URL, "mem:test;DB_CLOSE_DELAY=-1")
.option(USER, "sa")
.build());
connection = Mono.from(connectionFactory.create()).cache();
}
When you want to use it then you can Mono::flatMap
the connection and create a statement. Since you did a Mono::flatMap
on the statement then you can do a Mono::flatMap
on the result to continue the flow.
public Mono<SomeEntity> findById(Long id) {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity where id = $1")
.bind("$1", id)
.execute()))
.flatMap(result -> Mono.from(result.map(mapper)));
}
FYI: My mapper
is a function.
private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, rowMetadata) -> {
SomeEntity someEntity = new SomeEntity();
someEntity.setId(row.get("id", Long.class));
someEntity.setName(row.get("name", String.class));
return someEntity;
};
If you are returning more than one Entity, then use Mono::flatMapMany
public Flux<SomeEntity> findAll() {
return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity")
.execute()))
.flatMapMany(result -> result.map(mapper));
}
Ultimately though, if you are using R2DBC, then you should also include the reactive connection pool. Example given at WebFluxR2dbc.