Search code examples
androidretrofitrx-javachainingflatmap

RxJava flatmap chaining requests


i am using Retrofit with RxJAva for an app that gets Rss Feeds, but the rss doesn't contain all the informations so i use jsoup to parse every item link, to retrieve the image and the article's description. now i am using it this way:

public Observable<Rss> getDumpData() {
    return newsAppService.getDumpData()
            .flatMap(rss -> Observable.from(rss.channel.items)
            .observeOn(Schedulers.io())
            .flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
            .observeOn(Schedulers.io())
            .map(document -> document.select("div[itemprop=image] > img").first())
                    .doOnNext(element -> item.image = element.attr("src"))
            )))
            .defaultIfEmpty(rss)
            .ignoreElements()
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread());
}

and i am getting an error on this line: defaultIfEmpty(rss) it doesn't recognize rss of the flatmap. and when i move the defaultIfEmpty(rss) in flatmap brackets i have another error saying that the return type must be changed to Element. is their any solution ?


Solution

  • first of all you need to get rid of all the concurrency with observeOn and use subscribeOn.

    .observeOn(Schedulers.io())
    

    Please consider using observeOn with AndroidScheduler if want to sync back data from another thread back to the event-loop. Normally you would use observeOn before subscribing to a observable in order to sync back to ui-loop and change ui-information.

    .observeOn(AndroidSchedulers.mainThread())
    

    Secondly it is not recommended to mutate objects in the pipeline. You should return a new object very time.

    .doOnNext(element -> item.image = element.attr("src"))
    

    I tried to refactor your solution under consideration of the first two points. I am using RxJava2-RC5

    The flatMap operator has many overloades. One of them provides a function to zip together the incoming value and the created value.

    Observable<Rss> rssItemObservable = newsService.getDumpData()
                    .flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()),
                            (r, rItemList) -> {
                                Rss rInterim = new Rss();
                                rInterim.items = rItemList;
                                return rInterim;
                            });
    

    Helping-method for retrieving information for each item in Rss. Please consider using the overload with maxConcurrency, because on default it will subscribe to every stream at once. Therefore flatMap would create many http-requests.

    private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
            return Observable.fromIterable(rss.items)
                    .flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> {
                        RssItem item = new RssItem();
                        printCurrentThread("merge1");
                        item.image = img;
                        item.link = rItem.link;
                        return item;
                    }).toList().toObservable();
    }
    

    Helping-method for retrieving the image url. Returning observable is not opinionated about concurrency. If an error occurs, an empty string will be returned as default value.

    private Observable<String> getImageUrl(String link) {
               return Observable.fromCallable(() -> Jsoup.connect(link).get())
                    .map(document -> document.select("div[itemprop=image] > img").first())
                    .map(element -> element.attr("src"))
                    .onErrorResumeNext(throwable -> {
                        return Observable.just("");
                    });
    }
    

    You may look at the full example at github.gist: https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e