Search code examples
javaspring-webfluxproject-reactorreactor

How to create reactive 'intermediate subscriber'? How to combine all Flux<T> parts and process result of them further in chain (convert to Mono<R>)?


I'm playing around with spring webflux + mongodb-reactive to save binary files (images) to Mongo DB. Unfortunately spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE has no support for reactive programming for GridFsTemplate functionality. So I decide to create a Subscriber which will take all DataBuffer parts, combine them and transform to InputStream and so GridFsTemplate::store will be possible:

public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;

public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
    this.gridFsTemplate = gridFsTemplate;
    this.fileName = fileName;
    dataBuffers = new ArrayList<>();
}

public void hookOnNext(DataBuffer dataBuffer) {
    dataBuffers.add(dataBuffer);
    request(1);
}

public void hookOnComplete() {
    DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
    InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
    ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}

The problem is that i would like to return objectId for further processing but hookOnComplete is type of void. Even more.. I would like to get from here Mono of ObjectId, so I could process it further in reactive manner. In that case, as I understand react philosophy, I shouldn't use 'real' subscriber but something that would combine results from Flux<T> and when onComplete, return Mono<R>. Is project-reactor has such capability? I'm new in reactive programming so probably I could miss whole idea so please guide me how to achieve that.

In my previous solution I used block() to end reactive chain so i got ObjectId and next i emitted object id with new chain. But this is for sure not a good solution.


Solution

  • Unfortunately, that whole approach is asking for trouble and this is not advised.

    Implementing a Subscriber (even using the BaseSubscriber) is almost never a solution. If the driver library you're using does not support reactive streams, then you should probably wrap it as blocking code.

    Doing so will lose many features on the way (runtime behavior, backpressure, etc), but at least you won't risk blowing up your whole application if your subscriber implementation is not perfect.

    Until your core libraries/drivers support reactive streams, you should probably stick with Spring MVC, which supports async concepts and Flux Mono return types in some cases.