Search code examples
androidkotlinrx-javakotlinx.coroutines

Creating Kotlin coroutine Deferred object that emitts listener callback results


I´m currently switching from RxJava to Kotlin coroutines in a project, replacing all the Single and Observable return types by coroutine aquivalents. I´m still struggling with the following construct: An interface (e.g. of a repository) offers data query access and returns a RxJava Single. The implementation creates a Single object with Single.create and emits the result with onSuccess/onError. Now what the implementation needs to do to retrieve the data is creating a listener with callbacks and registering that listener to something. The callbacks of that created listener would then call onSuccess/onError on the selfmade Single. E.g. using firebase (although my question is not firebase specific):

interface Repository {
    fun getData(query: Query): Single<DataSnapshot?>
}

fun getData(query: Query): Single<DataSnapshot?> = Single.create { emitter ->
    query.addListenerForSingleValueEvent(object : ValueEventListener {
        override fun onCancelled(error: DatabaseError?) {
            emitter.onError(Exception())
        }

        override fun onDataChange(data: DataSnapshot?) {
            emitter.onSuccess(data)
        }
    })
}

Now what I want to have instead is the interface method returning coroutine Deferred. How does the implementation have to be created so that there also can be registered a listener with callbacks, whose results then will be delivered by the Deferred? I don´t see a way with these coroutine builders like async, launch etc. doing what onSuccess/onError would do.

interface Repository {
    fun getData(query: Query): Deferred<DataSnapshot?>
}

Solution

  • My suggestion is as follows:

    interface Repository {
        suspend fun getData(query: Query): Result<DataSnapshot>
    }
    

    where Result could be a sealed class with Success and Error cases:

    sealed class Result<T> {
        class Success<T>(result: T) : Result<T>()
        class Error<T>(error: String) : Result<T>()
    }
    

    This way, in the implementation side of getData you can do:

    return Success(yourData)
    

    or

    return Error("Something went wrong")
    

    In general when dealing with coroutines you should avoid returning the deferreds and try to use them "as synchronous methods".

    Edit: Now that I understand the problem I hope this helps solve it:

    //This is as generic as it gets, you could use it on any Query, no need to retype it
    suspend fun Query.await(): DataSnapshot = suspendCoroutine{cont ->
        addListenerForSingleValueEvent(object : ValueEventListener{
            override fun onCancelled(error: DatabaseError?) {
                cont.resumeWithException(error?: Exception("Unknown Error"))
            }
    
            override fun onDataChange(data: DataSnapshot?) {
                if(data != null){
                    cont.resume(data)
                } else {
                    cont.resumeWithException(Exception("Null data"))
                }
    
            }
        })
    }
    //this is your actual implementation
    suspend fun getData(query: Query):DataSnapshot =
            query.await()
    

    This code is assuming DatabaseError extends Exception or Throwable. If not you would need to create a wrapper type for it or use my original solution and use the regular resume in both cases.