I have a reactive stream that gets some data, loops through the data, processes the data, finally writes the data to Kafka
public Flux<M> sendData(){
Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
.flatMap(id->
Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
.flatMap( n -> {
return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
.flatMap(d -> return Flux.just(sendToKafka));
})
)
.doOnError(throwable -> {
log.debug("Error while reading data : {} ", throwable.getMessage());
return;
})
.subscribe();
}
public void run(String... args){
sendData();
}
I want this workflow to be run every minute. Can some one help me understand how to schedule this within the stream?
You can do something like this if you want to run something per minute.
Flux.interval(Duration.ofMinutes(1))
.onBackpressureDrop()
.flatMap(n -> sendData())
.subscribeOn(Schedulers.boundedElastic())
.subscribe()