Search code examples
kotlinrx-java2kotlin-coroutines

How to convert Flow into Flowable?


I've just added

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"

to the project. And I have suspend fun foo(): Flow<Bar> in class A (from external package).

I need to get Flowable<Bar> to use in java. I'd like to use extention fun A.fooRx(): Flowable<Bar> if possible.


Solution

  • You have to sneak out the returned Foo<Bar> from the coroutine in Kotlin:

    // SomeSuspendAPI.kt
    // -----------------
    
    // the method to convert
    suspend fun <T> Flow<T>.foo() : Flow<Int> {
        return flow { emit(0) }
    }
    
    @ExperimentalCoroutinesApi
    fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
        val self = this
        val future = CompletableFuture<Flowable<Int>>()
        GlobalScope.launch {
            try {
                future.complete(self.foo().asFlowable())
            } catch (ex: Throwable) {
                future.completeExceptionally(ex);
            }
        }
        return future
    }
    
    // Demo purposes
    fun <T> just(v: T) = flow { emit(v) }
    

    Then you can use that within Java:

    public class UseFoo {
        public static void main(String[] args) throws Exception {
            SomeSuspendAPIKt.fooRx(
                    SomeSuspendAPIKt.just(1)
            )
            .thenAccept(flowable -> flowable.subscribe(System.out::println))
            .join();
        }
    }
    

    Edit 1:

    You can, of course move some code back to the kotlin side:

    fun <T> Flow<T>.fooRx2() : Flowable<Int> {
        val self = this
        val subject = SingleSubject.create<Flowable<Int>>()
        GlobalScope.launch {
            try {
                subject.onSuccess(self.foo().asFlowable())
            } catch (ex: Throwable) {
                subject.onError(ex)
            }
        }
        return subject.flatMapPublisher { it }
    }
    

    Then

    public class UseFoo {
        public static void main(String[] args) throws Exception {
            SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
                    .blockingSubscribe(System.out::println);
        }
    }
    

    Edit 2:

    You can generalize this by using a transformation on the Kotlin side which gets you a continuation object to pass along:

    fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
        val self = this
        val subject = SingleSubject.create<Flowable<R>>()
        GlobalScope.launch {
            try {
                val r = fn(self).asFlowable();
                subject.onSuccess(r)
            } catch (ex: Throwable) {
                subject.onError(ex)
            }
        }
        return subject.flatMapPublisher { it }
    }
    
    public class UseFoo {
        public static void main(String[] args) throws Exception {
    
            SomeSuspendAPIKt.transformAsync(
                    SomeSuspendAPIKt.just(1),
                    (source, cont) -> SomeSuspendAPIKt.foo(source, cont)
            )
            .blockingSubscribe(System.out::println);
        }
    }