Search code examples
cachingrx-javaevent-bus

RxJava cache last item for future subscribers


I have implemented simple RxEventBus which starts emitting events, even if there is no subscribers. I want to cache last emitted event, so that if first/next subscriber subscribes, it receive only one (last) item.

I created test class which describes my problem:

public class RxBus {

ApplicationsRxEventBus applicationsRxEventBus;

public RxBus() {
    applicationsRxEventBus = new ApplicationsRxEventBus();
}

public static void main(String[] args) {
    RxBus rxBus = new RxBus();
    rxBus.start();
}

private void start() {
    ExecutorService executorService = Executors.newScheduledThreadPool(2);

    Runnable runnable0 = () -> {
        while (true) {
            long currentTime = System.currentTimeMillis();
            System.out.println("emiting: " + currentTime);
            applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    Runnable runnable1 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 1: " + applicationsEvent.number);
                }
            });

    Runnable runnable2 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 2: " + applicationsEvent.number);
                }
            });


    executorService.execute(runnable0);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable1);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable2);
}

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus.cache();
    }

    public void emit(ApplicationsEvent event) {
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable;
    }
}
private class ApplicationsEvent {
    long number;

    public ApplicationsEvent(long number) {
        this.number = number;
    }
}
}

runnable0 is emitting events even if there is no subscribers. runnable1 subscribes after 3 sec, and receives last item (and this is ok). But runnable2 subscribes after 3 sec after runnable1, and receives all items, which runnable1 received. I only need last item to be received for runnable2. I have tried cache events in RxBus:

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    private ApplicationsEvent event;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus;
    }

    public void emit(ApplicationsEvent event) {
        this.event = event;
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable.doOnSubscribe(() -> emit(event));
    }
}

But problem is, that when runnable2 subscribes, runnable1 receives event twice:

emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627

I am sure, that there is RxJava operator for this. How to achieve this?


Solution

  • Your ApplicationsRxEventBus does extra work by reemitting a stored event whenever one Subscribes in addition to all the cached events.

    You only need a single BehaviorSubject + toSerialized as it will hold onto the very last event and re-emit it to Subscribers by itself.