Search code examples
rx-java

RxJava merge subscriber gets results only from first observable


I am trying to understand how rxjava merge works. So here is simple code that should merge results from 2 observables and emit to subscriber

    Observable.merge(getObservable(), getTimedObservable())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override public void call(final String s) {
                        Log.i("test", s);
                    }
                });

    private Observable<String> getTimedObservable() {
        return Observable.interval(150, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, String>() {
                    @Override public String call(final Long aLong) {
                        Log.i("test", "tick thread: " + Thread.currentThread().getId());
                        return String.valueOf(aLong);
                    }
                });
    }

    public Observable<String> getObservable() {
        return  Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(final Subscriber<? super String> subscriber) {
                try {
                    Log.i("test", "simple observable thread: " + Thread.currentThread().getId());
                    for (int i = 1; i <= 10; i++) {
                        subscriber.onNext(String.valueOf(i * 100));
                        Thread.sleep(300);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
    }

I was expected that merged result in subscriber would be like

100 0 1 200 2 300 4 5 400

or something like that, however, the actual result is:

 test: simple observable thread: 257 
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13

It looks like Thread.sleep in first Observable blocks emitting in second observable, but I don't understand how. Can someone explain it?


Solution

  • merge will subscribe to both observables at the same time. The observable which will be subscribed first, will produce values on calling thread. Because the calling thread is blocked by observable1, observable2 can not produces values. SubscribeOn will only say where the subscription will happen. Lets say observable starts producing values on main-1. Every value downstreams will be on the same thread. No concurrency is happening.

    If you want to achive concurrency, you have to say for each observable, where the subscription has to happen. So lets say we have Observables.merge with two observables. Observable1 and Observable2 have subscribeOn with some Threadpool. Each observable will generate values on given thread of subscribeOn. You achieved concurrency.

    Plase have a look at the edited output at:

    @Test
    public void name() throws Exception {
        Subscription subscribe = Observable.merge(getObservable(), getTimedObservable())
                //.observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {
    
                    System.out.println("subscription " + s);
                    //Log.i("test", s);
                });
    
    
        Thread.sleep(5_000);
    }
    
    private Observable<String> getTimedObservable() {
        return Observable.interval(150, TimeUnit.MILLISECONDS)
                .map(aLong -> {
                    System.out.println("getTimedObservable: " + Thread.currentThread().getId());
    
                    //Log.i("test", "tick thread: " + Thread.currentThread().getId());
                    return String.valueOf(aLong);
                }).subscribeOn(Schedulers.io());
    }
    
    private Observable<String> getObservable() {
        return Observable.<String>create(subscriber -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    System.out.println("getObservable: " + Thread.currentThread().getId());
                    subscriber.onNext(String.valueOf(i * 100));
                    Thread.sleep(300);
                }
                subscriber.onCompleted();
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }).subscribeOn(Schedulers.io());
    }