Search code examples
rx-javaobservablerx-androidreactivex

Stitch Data from One Observable into another Observable with RxJava


I have two Observable streams of type User and Image:

final Observable<List<Image>> images = mNetworkingService.getImages(getToken(context));

final Observable<List<User>> users = mNetworkingService.getUsers()
.onErrorReturn(new Func1<Throwable, List<User>>() {
    @Override
    public List<User> call(Throwable throwable) {
        return Collections.emptyList();
    }
});

I want to assign the user's first and last name to each image object's fullName field based off the userId contained in both the Image and User objects.

This is what I have so far but not sure if zip is the best option. I am close but I get an error when I try to return the collection of image objects as it is expecting a List images but thinks it is just a single image object. If there is a better/cleaner way to do this correlation/assignment I would be interested.

return Observable.zip(images, users, 
new Func2<List<Image>, List<User>, Observable<List<Image>>>() {
    @Override
    public Observable<List<Image>> call(final List<Image> images, List<User> users) {
        for (Image image : images) {
            for (User user : users) {
                if (image.userId.equals(user.userId)) {
                    image.fullName = user.getFullName();
                }
            }
        }
        return Observable.from(images); // Error: Expecting List<Image> but got Image.
    }
}).flatMap(new Func1<Observable<List<Image>>, Observable<List<Image>>>() {
    @Override
    public Observable<List<Image>> call(Observable<List<Image>> it) {
        return it;
    }
});

Solution

  • zip() is the right option if you want to act after both requests completed, with both results.

    from() takes Iterable<T> as a parameter and flatten it to Observable of T, meaning it will emit each item separately, so that's mean that Observable.from(images) returns Observable<Image>, whereas you defined the zipper method as returns Observable<List<Image>> so there's your mismatch, and compile error.

    But, anyhow, you shouldn't return Observable from the zipper method, as the zipper just expect you to transform the multiple object you get, to single object.
    for each 'zip' of 2 items emission it will execute the zipper, and you'll get onNext() with the resulted zipped object, but in your case you will emit Observable object with each onNext(), this is probably not what you want.

    so to sum it up, you should probably want something like this:

    Observable.zip(images, users,
                new Func2<List<Image>, List<User>, List<Image>>() {
                    @Override
                    public List<Image> call(final List<Image> images, List<User> users) {
                        for (Image image : images) {
                            for (User user : users) {
                                if (image.userId.equals(user.userId)) {
                                    image.fullName = user.getFullName();
                                }
                            }
                        }
                        return images; 
                    }
                })
                .subscribe(new Action1<List<Image>>() {
                    @Override
                    public void call(List<Image> images) {
                        //do something with the list of images combined with user data
                    }
                });