Search code examples
androidrx-javarx-androidreactivex

Why doesn't doOnNext() get called?


Neither onNext() nor onCompleted() get called for my subscriber below. I've tried implementing the subscriber via doOnNext()/doOnTerminate(). I also tried doAfterTerminate(). I've also tried explicitly defining a subscriber and neither onNext() nor onCompleted() got called for it.

According to RxJS reduce doesn't continue, it's the reduce() that's not terminating so I tried adding the take(1) but that didn't work. In the same stackoverflow question someone said the problem might be my stream never closes. Aside from take(1), maybe there is some other way I should close the stream, but I don't understand ReactiveX well enough yet.

According to Why is OnComplete not called in this code? (RxAndroid), it could be that the original stream in the series doesn't terminate. But I don't see why that would matter if I'm calling take(1) which I think is supposed to emit a termination signal.

Basically, why doesn't the following line get executed?

System.out.println("doOnNext map.size()=" + map.size());

Even though the following line of code gets executed 98 times:

map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest calling code

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

Solution

  • I found a way to get the doOnTerminate() and doOnNext() to be called. It was to place the take(1) statement higher up in the stream processing. I got this idea based on the comment from @yosriz . I wish I understood why this fixed the problem, but I don't because I thought a termination signal would just get propagated through, but I guess there is a lot more I need to learn about ReactiveX / reactive functional programming / etc..

    JsonObjectObservableRequest calling code

    import com.android.volley.Request;
    import com.android.volley.RequestQueue;
    ...
        JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
        Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
    
    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            // .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable() 
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();
    
    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
    

    Furthermore, it works also if I put take(1) as the very first operation, before the first map() operation (the one that calls parseIdJSON()). However, if I put take(1) as the operation just before reduce(), it only partially works; the good thing is doOnNext() gets called but the bad thing is the map.size() is always 1 when it should be 98 for my data set.