Search code examples
javaandroidrx-javareactive-programmingrx-android

Issue with using 'map' in RxJava 2 / RxAndroid 2


I'm trying to learn RXJAVA for Android. Parts make sense and I'm still confused about a lot of the other bits but, given some time I hope it will all make a lot more sense.

At present I'm having trouble with the 'map' functionality. I'm receiving an error but cannot quite figure out how to resolve it.

Before I share my code, I'll explain my understanding.. At a simple level.. Observable - Code that emits data. Observer - Code that processes the emitted data. Map - Code that takes in data of type A and returns it processed or as type B.

So, with this in mind:

In gradle I have:

compile 'io.reactivex.rxjava2:rxjava:2.0.1' 
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

If I have:

    //declaration at top of file
    private Observable<Integer> myIntObservable;
    private Observer<Integer> myIntObserver;
    private Observer<String> myStringObserver;
    private Observable<String> myStringObservable;

    //usage in a function
    myIntObserver = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer value) {
            Toast.makeText(getApplicationContext(), "" + value, Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {
            Toast.makeText(getApplicationContext(), "Int Observer Async Complete", Toast.LENGTH_SHORT).show();
        }
    };


    //Connect my Observable to the observer.
    myIntObservable.observeOn(Schedulers.io());
    myIntObservable.subscribeOn(AndroidSchedulers.mainThread());
    myIntObservable.subscribe(myIntObserver);

This all works fine... my map usage is similar..

What I would like to do is use this same observable that returns an int, then use the map code to instead return a string...

Therefore:

    myStringObservable
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer query){
                    return "String Observable result == " + query;
                }
            });

Now, I have two issues:

a) The build error I receive is: Error:(179, 17) error: method map in class Observable cannot be applied to given types; required: Function found: > reason: cannot infer type-variable(s) R (argument mismatch; > cannot be converted to Function) where R,T are type-variables: R extends Object declared in method map(Function) T extends Object declared in class Observable

I believe that this is essentially telling me that the types are not correct for my usage but, I can't clearly see... how to resolve this.

b) The map code that I have posted above doesn't connect the observable to what it needs to observe... hence, should I add the subscribe line before the map command?

Therefore, I tried this..

public void setupAsyncSubscription(){

    myIntObservable
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(myIntObserver)
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer query){
                    return "String Observable result == " + query;
                }
            });

}

Whilst this removes the error detailed in 'a' it instead provides me with the following error:

Error:(180, 17) error: void cannot be dereferenced (this points to the 'map' call)

Finally, I can also see that what I 'return' back from the 'map' function isn't being processed at all... I'm not clear how to process that. I feel that I should be using the .subscribe call in that case??

I 'think' that I am slowly on the right path to resolving the issue(s), but I'm not quite there and I don't want to just try and happen upon the answer without understanding what the problem is.

As always, any help is appreciated.


Solution

  • You have multiple issues here. There are - one by one:

    //Connect my Observable to the observer.
    myIntObservable.observeOn(Schedulers.io());
    myIntObservable.subscribeOn(AndroidSchedulers.mainThread());
    myIntObservable.subscribe(myIntObserver);
    

    Code above would not work as you probably think.

    Operators observeOn, subscribeOn are not designed to change internal observable state. They are returning new observable with desired behaviour.

    To accomplish observing on io() thread and subscribing your observable on mainThread() you need to change the code:

    //Connect my Observable to the observer.
    myIntObservable = myIntObservable.observeOn(Schedulers.io());
    myIntObservable = myIntObservable.subscribeOn(AndroidSchedulers.mainThread());
    myIntObservable.subscribe(myIntObserver);
    

    Or use (preferred) chaining:

    //Connect my Observable to the observer.
    myIntObservable
        .observeOn(Schedulers.io());
        .subscribeOn(AndroidSchedulers.mainThread());
        .subscribe(myIntObserver);
    

    For the code same as yours, calling .subscribe() on not changed Observable will result in subscribing and observing on the same thread from which you call .subscribe() (most likely from main thread).

    Keep in mind you need to dispose observable once the work is finished.

    When it comes to mapping problem - map() operator changes one type of Observable<A> into another type of observable Observable<B>.

    If you'd like to end up with String objects converted from Integer objects you need to use - as a source of data - your original myIntObservable:

    myStringObservable = myIntObservable
        (...)    
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer query){
                return "String Observable result == " + query;
            }
        });
    

    In above example myIntObservable will emit Integer objects (as expected in .apply(Integer query) method. Then .map() operator will create another Observable of type Observable<String> you can assign to myStringObservable (or do whatever you want from here).

    Then, using myStringObservable you can subscribe to its events:

    myStringObservable.subscribe(myStringObserver)
    

    Again, please remember to dispose Observable when work is done.

    Please also note that you should:

    • .observeOn() as soon as possible for current piece of work,
    • .subscribeOn() as late as possible (you don't want to continue io() or computation() operations on your main thread, right?).

    Hint at the end: consider using lambdas with RxJava. Using all these anonymous classes (new Function() etc.) will make your code hard to read in the nearest future.