Search code examples
kotlinkotlin-coroutineskotlin-floweventstoredb

Using emit to build a Kotlin flow runs indefinitely and doesnt complete


I use a java library with which I can subscribe to events from my eventstore db.

I can create a subscription according to the following SubscirptionListener

public abstract class SubscriptionListener {
    public void onEvent(Subscription subscription, ResolvedEvent event) {
    }

    public void onError(Subscription subscription, Throwable throwable) {
    }

    public void onCancelled(Subscription subscription) {
    }
}

I would like to emit ResolvedEvents as part of a flow each time the subscription is triggered. However, the call to emit doesn't finish.

    fun flowSubscriptionListener(
        streamName: String,
        options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
        onError: (subscription: Subscription?, throwable: Throwable) -> Unit = { _, _ -> },
        onCancelled: (subscription: Subscription) -> Unit = { _ -> }
    ): Flow<ResolvedEvent> {
        return flow {
            val listener = object : SubscriptionListener() {
                override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
                    logger.info {
                        "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}"
                    }

                    runBlocking {
                        logger.info { "emitting event" }
                        [email protected](event)
                        logger.info { "Event emitted" }
                    }
                }

                override fun onError(subscription: Subscription?, throwable: Throwable) {
                    logger.error {
                        "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
                    }
                    onError(subscription, throwable)
                }

                override fun onCancelled(subscription: Subscription) {
                    logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
                    onCancelled(subscription)
                }
            }
            client.subscribeToStream(streamName, listener).await()
        }.buffer(10)
    }

I have a sample setup where I await a flow with three events

flowSubscriptionListener(
            streamName = "SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19",
        ).map {
            it.event.eventType
        }.collect {
            println(it)
        }

However, I receive no events at all. The console output shows me that invocation of emit never terminates.

[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - Received event 0@SampleTournament-adb517b8-62e9-4305-b3b6-c1e7193a6d19
[grpc-default-executor-1] INFO lib.eventstoredb.wrapper.EskWrapperEsdb - emitting event

I am expecting the logging of "Event emitted"


Solution

  • In order to wrap callback-based API, you should use callbackFlow instead. It supports concurrent emissions, which I think is likely your problem here.

    Also, it will properly handle the cancellation of the subscription when the flow itself is cancelled (via awaitClose()).

    Here is one way to do it:

    fun EventStoreDBClient.flowSubscription(
        streamName: String,
        options: SubscribeToStreamOptions = SubscribeToStreamOptions.get(),
    ): Flow<ResolvedEvent> = callbackFlow {
        val listener = object : SubscriptionListener() {
            override fun onEvent(subscription: Subscription, event: ResolvedEvent) {
                logger.info { "Received event ${event.originalEvent.streamRevision}@${event.originalEvent.streamId}" }
                logger.info { "Emitting event" }
                trySendBlocking(event)
                logger.info { "Event emitted" }
            }
    
            override fun onError(subscription: Subscription?, throwable: Throwable) {
                logger.error {
                    "Received error with message: ${throwable.message ?: "No message"} on subscription ${subscription?.subscriptionId}"
                }
                close(throwable)
            }
    
            override fun onCancelled(subscription: Subscription) {
                logger.debug { "Subscription ${subscription.subscriptionId} cancelled" }
                close()
            }
        }
        val subscription = subscribeToStream(streamName, listener, options).await()
        awaitClose {
            subscription.stop()
        }
    }.buffer(10)
    

    Note that I also converted it to an extension function on EventStoreDBClient, which seems appropriate here. And I removed the error/cancellation callbacks because Flow already handles those (you can put them back if you need them)