Search code examples
javarx-javareactive-programming

RxJava collect AMPQ event in an Observable and subscribe with a buffer


I have to collect some AMPQ events and then print them every 10 sec using a buffer.

private Observable<Event> obs = Observable.empty(); 
private final Disposable disposable = obs.buffer(10, SECONDS)
                              .retry(t -> true)
                              .subscribe(System.out::println);

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
      obs = obs.concatWith(Observable.just(event));
}

Event is the message and void handle is the consumer.

I debug this code and it print only an empty list, and it makes sense because obs is empty.

How can I add (concat?) events into this Observable and execute the disposable continually? Thanks.


Solution

  • You need a Subject which you can subscribe to. New elements can be pushed into a Subject using next(T element)

    private Subject<Event> subject = ReplaySubject.create();
    
    @Override
    public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
         subject.next(event);
    }
    
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
    

    You can subscribe to the observable, that is returned by the getObservable() method.