Search code examples
kotlinrx-javakotlin-coroutinesrx-java3

How to convert a suspend function to an RX Single (or Completable)?


We're refactoring our project from RX to Kotlin Coroutines, but not in one go, so we need our project use both for some time.

Now we have a lot of methods that use RX single as a return type like this, because they are heavy, long running operations, like API calls.

fun foo(): Single<String> // Heavy, long running opertation

We want it to be like this:

suspend fun foo(): String // The same heavy, long running opertation

Where we use this method, we want to still use RX.

We have been doing this:

foo()
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

Now how should I convert my suspend fun to produce a Single, that I can work with?

Is this a good idea?

Single.fromCallable {
    runBlocking {
        foo() // This is now a suspend fun
    }
}
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

Solution

  • I didn’t give it a lot of time to reason it out, but I think your way of creating it might use one additional thread for the duration of the suspend function.

    There is an official kotlinx-coroutines-rx3 library that has conversions. The rxSingle function is relevant here. Under the hood, it will call the suspend function in a suspending way without blocking an additional thread.

    I haven’t used this myself so you may want to test for yourself to be sure, but I don’t think you need to use subscribeOn immediately following this because the actual work is being done in a coroutine. Your Foo suspend function, since it is a suspend function, should already be internally using Dispatchers.IO if it performs blocking IO.

    rxSingle { foo() }
        .map { ... }
        .subscribe { ... }