Search code examples
androidrx-androidrx-java2networkonmainthread

rxjava2: Using concatenated completables and observing them in IO thread


At first, I know that network operations shouldn`t be called from Main thread. Thats why I am observing completables on Schedulers.io()!

I am trying to concat two completable. Both completable use network, thats why i subscribe on Schedulers.io(). If i am using concatWith(or andThen) code fails with NetworkOnMainThreadException. Here is kotlin code:

val singleSubject = SingleSubject.create<String>(); 
completalbe1.concatWith(completable2)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            singleSubject.onSuccess("ok")
        }, { error ->
            Log.e(tag, error.message, error)//here i got exception
            singleSubject.onError(error) 
        })
return singleSubject

If i rewrite code without completable chaining - all is ok. Here is working code:

val singleSubject = SingleSubject.create<String>(); 
completable1
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            completable2
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe({
                    singleSubject.onSuccess("ok")
                }, { error ->
                    Log.e(tag, error.message, error)
                    singleSubject.onError(error)
                })
        }, {error ->
            Log.e(tag, error.message, error)
            singleSubject.onError(error)
        })
return singleSubject

I wonder why first snippet don`t work but second does?

UPD1:Here is stacktrace:

        android.os.NetworkOnMainThreadException
 at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1273)
 at libcore.io.BlockGuardOs.recvfrom(BlockGuardOs.java:249)
 at libcore.io.IoBridge.recvfrom(IoBridge.java:549)
 at java.net.PlainSocketImpl.read(PlainSocketImpl.java:481)
 at java.net.PlainSocketImpl.access$000(PlainSocketImpl.java:37)
 at java.net.PlainSocketImpl$PlainSocketInputStream.read(PlainSocketImpl.java:237)
 at okio.Okio$2.read(Okio.java:139)
 at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
 at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
 at okhttp3.internal.connection.RealConnection.isHealthy(RealConnection.java:498)
 at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:133)
 at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
 at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:211)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
 at okhttp3.RealCall.execute(RealCall.java:69)
 at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
 at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41)
 at io.reactivex.Observable.subscribe(Observable.java:10955)
 at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
 at io.reactivex.Observable.subscribe(Observable.java:10955)
 at io.reactivex.internal.operators.observable.ObservableIgnoreElementsCompletable.subscribeActual(ObservableIgnoreElementsCompletable.java:31)
 at io.reactivex.Completable.subscribe(Completable.java:1664)
 at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89)
 at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65)
 at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
 at com.catalyst.opti.AppManager$transferImage$1$subscribe$1.onStateChanged(AppManager.kt:323)
 at com.amazonaws.mobileconnectors.s3.transferutility.TransferStatusUpdater$1.run(TransferStatusUpdater.java:172)
 at android.os.Handler.handleCallback(Handler.java:742)
 at android.os.Handler.dispatchMessage(Handler.java:95)
 at android.os.Looper.loop(Looper.java:154)
 at android.app.ActivityThread.main(ActivityThread.java:5527)
 at java.lang.reflect.Method.invoke(Native Method)
 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:739)
 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:629)

UPD2:

completable1 is a function uploading file to AWS S3:

private fun transferImage(imageName: String, image: File): Completable {
    return Completable.create(object : CompletableOnSubscribe {
        override fun subscribe(e: CompletableEmitter) {
            val transferObserver = transferUtility.upload("some", imageName, image)
            transferObserver.setTransferListener(object : TransferListener {
                override fun onProgressChanged(id: Int, bytesCurrent: Long, bytesTotal: Long) {
                    Log.i(tag, "bytesCurrent: $bytesCurrent, bytesTotal: $bytesTotal")
                }

                override fun onStateChanged(id: Int, state: TransferState?) {
                    if (state == TransferState.COMPLETED) {
                        e.onComplete()
                    }
                }

                override fun onError(id: Int, ex: java.lang.Exception) {
                    Log.d(tag, "error transfer s3: ${ex.message}", ex)
                    e.onError(ex)
                }
            })
        }
    });
}

completable2 is retrofit2 call:

@POST("some")
    fun verifyLocation(@Header(AUTH_TOKEN_HEADER) authToken: String, @Body 
verifyLocation: VerifyLocation): Completable

Solution

  • I'd guess transferObserver.setTransferListener calls the callback on the main thread which then will subscribe to completable2 on the main thread as well. You have to apply subscribeOn(Schedulers.io()) to completable2, just like in your other example.

    val singleSubject = SingleSubject.create<String>(); 
    completalbe1.subscribeOn(Schedulers.io())
        .concatWith(completable2.subscribeOn(Schedulers.io())) // <-----------------------
        .observeOn(Schedulers.io())
        .subscribe({
            singleSubject.onSuccess("ok")
        }, { error ->
            Log.e(tag, error.message, error)//here i got exception
            singleSubject.onError(error) 
        })
    
    return singleSubject
    

    subscribeOn affects subscription (side) effects but your completalbe1 has an observation effect when it calls onComplete on the main thread.