Search code examples
spring-bootdatabase-connectionreactive-programming

How to get a Connection from a Reactive app


In a non-reactive app, you can simply get the connection from the DataSource.

In Reactive, I found ConnectionFactory, but it returns an org.reactivestreams.Publisher. With that, it seems you can only subscribe() an org.reactivestreams.Subscriber.


Solution

  • When you get a publisher then you need to wrap it in either a Mono::from or a Flux::from in order to be able to subscribe to it. For a connection you should wrap it in a Mono::from and cache it like so.

    private final Mono<? extends Connection> connection;
    
    public SomeEntityDao() {
        ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
                .option(DRIVER, H2_DRIVER)
                .option(PASSWORD, "")
                .option(URL, "mem:test;DB_CLOSE_DELAY=-1")
                .option(USER, "sa")
                .build());
        connection = Mono.from(connectionFactory.create()).cache();
    }
    

    When you want to use it then you can Mono::flatMap the connection and create a statement. Since you did a Mono::flatMap on the statement then you can do a Mono::flatMap on the result to continue the flow.

    public Mono<SomeEntity> findById(Long id) {
        return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity where id = $1")
                        .bind("$1", id)
                        .execute()))
                .flatMap(result -> Mono.from(result.map(mapper)));
    }
    

    FYI: My mapper is a function.

    private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, rowMetadata) -> {
        SomeEntity someEntity = new SomeEntity();
        someEntity.setId(row.get("id", Long.class));
        someEntity.setName(row.get("name", String.class));
        return someEntity;
    };
    

    If you are returning more than one Entity, then use Mono::flatMapMany

    public Flux<SomeEntity> findAll() {
        return connection.flatMap(con -> Mono.from(con.createStatement("select * from some_entity")
                        .execute()))
                .flatMapMany(result -> result.map(mapper));
    }
    

    Ultimately though, if you are using R2DBC, then you should also include the reactive connection pool. Example given at WebFluxR2dbc.