I have to collect some AMPQ events and then print them every 10 sec using a buffer.
private Observable<Event> obs = Observable.empty();
private final Disposable disposable = obs.buffer(10, SECONDS)
.retry(t -> true)
.subscribe(System.out::println);
@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
obs = obs.concatWith(Observable.just(event));
}
Event is the message and void handle is the consumer.
I debug this code and it print only an empty list, and it makes sense because obs is empty.
How can I add (concat?) events into this Observable and execute the disposable continually? Thanks.
You need a Subject which you can subscribe to.
New elements can be pushed into a Subject using next(T element)
private Subject<Event> subject = ReplaySubject.create();
@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
subject.next(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
You can subscribe to the observable, that is returned by the getObservable()
method.