Search code examples
java-8neo4jproject-reactorspring-data-neo4j

How to achieve wait/notify in flux.subscribe in java


I have just started with functional programming in java and facing some difficulty. I'm writing a method to establish a reactive session with database and return flux object to the caller. Caller will then subscribe to this flux and fetch results accordingly. Trying to mimic this example here

What I have is

return  Flux.usingWhen(
         Mono.just(getDataStore().getRxSession()),
         session -> Flux.from(session.run(query).records()),
         RxSession::close);

and then a different function subscribing to this flux

Flux<Record> rflux =  query.sub();
rflux.takeUntil(//Implement wait and notifier here).subscribe(//notify here);

Solution

  • Found my answer here : https://www.baeldung.com/reactor-core

    I can use Subscription interface methods to request next set of records.

    Flux<Record> rflux =  query.sub();
    rflux.subscribe(new Subscriber<Record>() {
    private Subscription s;
    @Override
    public void onSubscribe(Subscription sub) {
        this.s = sub;
        s.request(reqParam);
    }
    
    @Override
    public void onNext(Record record) {
        System.out.println("The record that is pulled is " + record.toString());
        s.request(reqParam);
    }
    
    @Override
    public void onError(Throwable throwable) {
    
    }
    
    @Override
    public void onComplete() {
    
    }
    

    });