Search code examples
javaandroidretrofitrx-java2

Problems with zip function in rxjava2


It seems the zip function doesnt do anything.The LOG of requests with TAG INSIDE prints out the observables but is empty OUTSIDE.In the zip function LOG calls dont do anything. getPosts returns the list of ids. I`m a beginner in Android so maybe I bit more than I can chew with Rxjava,but apparently this is the best solution.Essentially, getPosts returns a list of ids which I should use to compose further requests in getStory.If there anything simpler I`m eager to hear it.Thanks.

MainActivty

package com.example.hackernews;

import androidx.appcompat.app.AppCompatActivity;
import androidx.recyclerview.widget.RecyclerView;

import android.annotation.SuppressLint;
import android.os.Bundle;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

import static java.lang.Math.min;

public class MainActivity extends AppCompatActivity {
    private RecyclerView mRecyclerView;
    private RecyclerView.Adapter mAdapter;
    private RecyclerView.LayoutManager mLayoutManager;
    private  List<DataResponse> dataResponses;
    private Observable<List<Integer>> ids;
    private List<Observable<DataResponse>> requests = new ArrayList<>();

    @SuppressLint("CheckResult")
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);




//        mRecyclerView = findViewById(R.id.recyclerView);
//        mRecyclerView.setHasFixedSize(true);
//        mLayoutManager = new LinearLayoutManager(getApplicationContext());
//        mRecyclerView.setLayoutManager(mLayoutManager);



        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(interceptor)
                .build();

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://hacker-news.firebaseio.com/v0/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .client(client)
                .build();

        HackerNewsApi hackerNewsApi = retrofit.create(HackerNewsApi.class);

        ids = hackerNewsApi.getPosts();

        ids.subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(id -> {
                    for (Integer i : id) {
                        requests.add(hackerNewsApi.getStory(i));
                    }
                    Log.e("onSubscribe", "INSIDE " + requests);
                }, Throwable::printStackTrace);
        Log.e("onSubscribe", "OUTSIDE " + requests);

        Observable.zip(
                requests,
                new Function<Object[], Object>() {
                    @Override
                    public Object apply(Object[] objects) throws Exception {
                        // Objects[] is an array of combined results of completed requests

                        Log.e("onSubscribe", "YOUR OBJECTS ARE HERE: " + objects);
                        // do something with those results and emit new event
                        return new Object();
                    }
                })
                // After all requests had been performed the next observer will receive the Object, returned from Function
                .subscribe(
                        // Will be triggered if all requests will end successfully (4xx and 5xx also are successful requests too)
                        new Consumer<Object>() {
                            @Override
                            public void accept(Object o) throws Exception {
                                //Do something on successful completion of all requests
                                Log.e("onSubscribe", "YOUR OBJECTS ARE HERE: " + o);
                            }
                        },

                        // Will be triggered if any error during requests will happen
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable e) throws Exception {
                                //Do something on error completion of requests
                            }
                        }
                );


//            mRecyclerView = findViewById(R.id.recyclerView);
//            mRecyclerView.setHasFixedSize(true);
//            mLayoutManager = new LinearLayoutManager(this);
//            Log.e("onSubscribe", "YOUR DATA IS HERE: " + dataResponses);
//            mAdapter = new ExampleAdapter(dataResponses);
//
//
//            mRecyclerView.setLayoutManager(mLayoutManager);
//            mRecyclerView.setAdapter(mAdapter);


    }

}

HackerNewsApi

package com.example.hackernews;

import java.util.List;

import io.reactivex.Observable;
import retrofit2.Call;
import retrofit2.http.GET;
import retrofit2.http.Path;

public interface HackerNewsApi {

    @GET("askstories.json?print=pretty")
    Observable<List<Integer>> getPosts();

    @GET("item/{id}.json?print=pretty")
    Observable<DataResponse> getStory(@Path("id") Integer id);
}

Solution

  • You create a list of story retrieving observables in the background and concurrently try to use the list being constructed on the main thread.

    Why not simply compose over the getPosts() like this?

     hackerNewsApi.getPosts()
     .subscribeOn(Schedulers.io())
     .flatMapIterable(posts -> posts)
     .flatMap(post -> hackerNewsApi.getStory(post))
     .toList()
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(allStories -> { /* ... */ }, error -> { /* ... */ });
    

    flatMapIterable unrolls your initial list of posts and toList recombines them in some order.