Search code examples
kotlinreactive-programmingkotlin-extensionkotlin-coroutines

In Kotlin, how do I convert "CompletableFuture<Optional<T>>" to "Flow<T?>"?


I'm trying to convert a CompletableFuture<Optional<T>> to a Flow<T?>. The extension function I'm trying to write is

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
    this.toMono().map { (if (it.isPresent) it.get() else null) }.asFlow()

but it fails because asFlow() doesn't exist for nullable types, AFAICT based on its definition.

So, how do I convert CompletableFuture<Optional<T>> to Flow<T?>?

Edit 1:

Here's what I've come up with so far. Feedback appreciated.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import java.util.Optional
import java.util.concurrent.CompletableFuture

fun <T> Optional<T>.orNull(): T? = orElse(null)

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> = flowOf(this.join().orNull())

FYI, in my case, which is using Axon's Kotlin extension queryOptional, I can now write this:

inline fun <reified R, reified Q> findById(q: Q, qgw: QueryGateway): Flow<R?> {
    return qgw.queryOptional<R, Q>(q).asFlowOfNullable()
}

I'll defer for a while creating a comment with the above pattern as the answer to allow for feedback.

Edit 2: Since it was pointed out below that asFlowOfNullable in Edit 1 would block the thread, I'm going with this from @Joffrey for now:

fun <T> Optional<T>.orNull(): T? = orElse(null)

fun <T> CompletableFuture<Optional<T>>.asDeferredOfNullable(): Deferred<T?> = thenApply { it.orNull() }.asDeferred()

Edit 3: credit to both @Tenfour04 & @Joffrey for their helpful input. :)


Solution

  • To use the below extensions, you need the jdk8 coroutines library:

    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$1.5.0"
    

    I'm not sure where the asFlow() function comes from that you're using, but here's a way I think would work without it. It seems a little odd to me to have a Flow of a single item, because it could just be a suspend function or if you need it as an object to pass around, a Deferred, which is intended for returning a single result and is therefore more analogous to a Future than a Flow.

    fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
        flow { emit(await().orElse(null)) }
    

    As a suspend function:

    suspend fun <T> CompletableFuture<Optional<T>>.awaitNullable(): T? = 
        await().orElse(null))
    

    As a deferred:

    fun <T> CompletableFuture<Optional<T>>.asDeferredNullable(): Deferred<T?> =
        thenApply { it.orElse(null) }.asDeferred()