Search code examples
kotlincallbackkotlin-coroutinesdispatcher

Convert callback to co-routine with constraint on main thread for initial call


I would like to convert a callback to co-routine, and the SDK states that the API call needs to be made on the main thread. The solution works, but I am not confident that it is correct in principle.

It looks like this roughly:

   override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> {

        return suspendCancellableCoroutine { continuation ->

            val observer =
                object : Observer<RegisterResponse<Profile?>> {
               fun onNext(t: Profile) {
                 continuation.resume(Success(t))
           }

     CoroutineScope(Dispatchers.Main).launch {
         userManager.register(email, observer)
       }
}
}

It seems to me that the SDK wants to invoke the observer callback on the Main thread, but my process is triggered in a view model scope on the IO thread (to avoid blocking main). So the observer I guess is in practice running on the IO thread.

Thoughts on how to approach this?


Solution

  • Just to get this out of the way, if this library provided you an ObservableSource reference instead of making you pass it an Observer, you could use awaitFirst() on it, which would of course be simpler than implementing this yourself.


    Avoid doing this: CoroutineScope(Dispatchers.Main).launch which is essentially no different than using GlobalScope.launch(Dispatchers.Main). It creates an unbound (never cancelled) scope, which is a common source of memory leaks. If the coroutine that is calling this suspend function is cancelled, the other coroutine you have launched will not be notified and cancelled, since it is not a child.

    Secondly, the other coroutine does not wait for it--the events of the inner coroutine can come some time in the future.

    To ensure you register your API on the main thread, use a withContext(Dispatchers.Main) call around this whole function. Then, the suspendCancellableCoroutine lambda block will run on the main thread, so you'll be calling the API registration function on the main thread.

    Some other points about implementing this:

    • Observer has an onSubscribe function that hands you a Disposable that you can use to cancel early. You need to do this to support cancellation.
    • Calling continuation.resume() more than once will crash your coroutine, so you need some safeguards just in case the API surprises you and emits more than one item.
    • I added another safeguard for the possible case of the subscription ending without emitting anything.
    • In onError, I also check continuation.isActive to avoid a multiple-resume crash in the possible case of a single item being emitted followed by an error occurring before the subscription ends.

    Since the Kotlin coroutines library is open source, you can see how they implemented Observable.await here for an example of how to do this kind of thing properly.

    The solution should look something like:

    override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> = withContext(Dispatchers.Main) {
        suspendCancellableCoroutine { continuation ->
            val observer = object : Observer<RegisterResponse<Profile?>> {
                lateinit var subscription: Disposable
                var seenValue = false
    
                override fun onSubscribe(disposable: Disposable) {
                    subscription = disposable
                    continuation.invokeOnCancellation { subscription.dispose() }
                }
    
                override fun onNext(t: Profile) {
                     if (!seenValue) {
                         seenValue = true
                         continuation.resume(Success(t))
                         subscription.dispose()
                     }
                }
    
                override fun onComplete() {
                    if (continuation.isActive && !seenValue) {
                        continuation.resume(Error(NoSuchElementException("Observer completed without emitting any value.")))
                    }
                }
    
                override fun onError(throwable: Throwable) {
                    if (continuation.isActive) continuation.resume(Error(throwable))
                }
            }
            userManager.register(email, observer)
        }
    }