I'm trying to convert a CompletableFuture<Optional<T>>
to a Flow<T?>
. The extension function I'm trying to write is
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
this.toMono().map { (if (it.isPresent) it.get() else null) }.asFlow()
but it fails because asFlow()
doesn't exist for nullable types, AFAICT based on its definition.
So, how do I convert CompletableFuture<Optional<T>>
to Flow<T?>
?
Edit 1:
Here's what I've come up with so far. Feedback appreciated.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import java.util.Optional
import java.util.concurrent.CompletableFuture
fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> = flowOf(this.join().orNull())
FYI, in my case, which is using Axon's Kotlin extension queryOptional
, I can now write this:
inline fun <reified R, reified Q> findById(q: Q, qgw: QueryGateway): Flow<R?> {
return qgw.queryOptional<R, Q>(q).asFlowOfNullable()
}
I'll defer for a while creating a comment with the above pattern as the answer to allow for feedback.
Edit 2:
Since it was pointed out below that asFlowOfNullable
in Edit 1 would block the thread, I'm going with this from @Joffrey for now:
fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asDeferredOfNullable(): Deferred<T?> = thenApply { it.orNull() }.asDeferred()
Edit 3: credit to both @Tenfour04 & @Joffrey for their helpful input. :)
To use the below extensions, you need the jdk8
coroutines library:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$1.5.0"
I'm not sure where the asFlow()
function comes from that you're using, but here's a way I think would work without it. It seems a little odd to me to have a Flow of a single item, because it could just be a suspend
function or if you need it as an object to pass around, a Deferred, which is intended for returning a single result and is therefore more analogous to a Future than a Flow.
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
flow { emit(await().orElse(null)) }
As a suspend function:
suspend fun <T> CompletableFuture<Optional<T>>.awaitNullable(): T? =
await().orElse(null))
As a deferred:
fun <T> CompletableFuture<Optional<T>>.asDeferredNullable(): Deferred<T?> =
thenApply { it.orElse(null) }.asDeferred()