I have a class that populates a Map from a Flux source. I want to able to updateTheData()
on the fly, but I want getTheData()
to be able to return a Mono that resolves to current or pending data.
Basically if theData != null and no in progress flux, return Mono.just(theData) Otherwise return mono that will eventually emit theData.
Edit: This is the best I can do so far
class SomeClass {
private Mono<Map<String, Data>> theData;
private final SomeFluxService someFluxService = new SomeFluxService();
public SomeClass() {
updateTheData();
}
public void updateTheData() {
someFluxService.get()
.collectMap(Data::getId, Function.identity())
.subscribe(d -> this.theData = Mono.just(d));
}
public Mono<Map<String, Data>> getTheData() {
return this.theData;
}
}
But there is still the problem of before updateTheData()
completes for the first time, getTheData()
will return null
Help appreciated!
But there is still the problem of before
updateTheData()
completes for the first time,getTheData()
will return null
This is because you're using a consumer in your subscribe method to update a Mono
only when data is emitted. This is a bit of an odd approach, it would be better to simply cache()
the Mono
and then assign it to your field straight away:
public void updateTheData() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache();
}
Taking that thinking a step further, if your requirement is to update the data every x
seconds rather than inherently on demand, you can just pass that into the cache function and do away with the separate method entirely:
public NewClass() {
theData = someFluxService.get()
.collectMap(Data::getId, Function.identity())
.cache(Duration.ofMinutes(5));
}
public Mono<Map<String, Data>> getTheData() {
return theData;
}