Search code examples
rx-javaobservablenetflixrx-android

rxjava - cache from observerables using concat not working


Following this tutorial I tought i'd create a fake memory,disk, network objects to see there order if one fails. My goal was to see if one failed it concat move to the next one as per the articles comment:

The key to this pattern is that concat() only subscribes to each child Observable when it needs to. There's no unnecessary querying of slower sources if data is cached, since first() will stop the sequence early. In other words, if memory returns a result, then we won't bother going to disk or network. Conversely, if neither memory nor disk have data, it'll make a new network request.

i then created the following test:

 Data disk=new Data();
    disk.setAddress("38 Oriole");
    disk.setPostalCode("mc72l9");
    disk.setDate("feb 12");

Data network=new Data();
network.setAddress("39 skyway");
network.setPostalCode("mt82l9");
network.setDate("feb 13");


Observable observerMemory = Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(final Subscriber subscriber) {
                subscriber.onNext(null);
            }}
);


Observable<Data> observerDisk = Observable.from(disk);
Observable<Data> observerNetwork = Observable.from(network);

Observable.concat(observerMemory, observerDisk, observerNetwork).first().subscribe(new Subscriber<Data>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        System.out.println("****" + e);
    }

    @Override
    public void onNext(Data data) {
        System.out.println(data.toString());
    }
});

However, when running this test i was expecting it to print the disk observable (stream) . so the output should have been 38 Oriole etc since the very first observable was returning null and concat should get the next stream if one fails i thought. I mean as per the article if one stream failed such as the memory stream then concat should go to the next stream which is disk. But for me it did no such thing. The emission stopped at the first error of null. What am i doing wrong. I am doing this fake test to ensure this way works before i continue to make a real cache system using reactive programming.

by the way here is the rxjava dependency im using:

compile 'com.netflix.rxjava:rxjava-android:0.20.7'

Solution

  • The problem is that observerMemory is calling onNext(null) when subscribed to. To get concat to move on to the next Observable, observerMemory needs to complete. If you replace subscriber.onNext(null) with subscriber.onComplete() the first emitted item will then be from disk.