I'm playing around with spring webflux + mongodb-reactive to save binary files (images) to Mongo DB. Unfortunately spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE has no support for reactive programming for GridFsTemplate
functionality. So I decide to create a Subscriber which will take all DataBuffer parts, combine them and transform to InputStream
and so GridFsTemplate::store
will be possible:
public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;
public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
this.gridFsTemplate = gridFsTemplate;
this.fileName = fileName;
dataBuffers = new ArrayList<>();
}
public void hookOnNext(DataBuffer dataBuffer) {
dataBuffers.add(dataBuffer);
request(1);
}
public void hookOnComplete() {
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}
The problem is that i would like to return objectId
for further processing but hookOnComplete
is type of void. Even more.. I would like to get from here Mono of ObjectId, so I could process it further in reactive manner. In that case, as I understand react philosophy, I shouldn't use 'real' subscriber but something that would combine results from Flux<T>
and when onComplete, return Mono<R>
. Is project-reactor has such capability? I'm new in reactive programming so probably I could miss whole idea so please guide me how to achieve that.
In my previous solution I used block()
to end reactive chain so i got ObjectId
and next i emitted object id with new chain. But this is for sure not a good solution.
Unfortunately, that whole approach is asking for trouble and this is not advised.
Implementing a Subscriber
(even using the BaseSubscriber
) is almost never a solution. If the driver library you're using does not support reactive streams, then you should probably wrap it as blocking code.
Doing so will lose many features on the way (runtime behavior, backpressure, etc), but at least you won't risk blowing up your whole application if your subscriber implementation is not perfect.
Until your core libraries/drivers support reactive streams, you should probably stick with Spring MVC, which supports async concepts and Flux
Mono
return types in some cases.