Search code examples
androidkotlinkotlin-coroutineskotlin-flowasynccallback

'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. Error in Kotlin flow


I have an app where user can upload their wallpapers and download it and can see it so when user get login and in order to get user's details from firestore I have this function where I will fetch user's details :-

override suspend fun getUserDetails(userId: String): Flow<Response<User>> = callbackFlow {
    val listener = fireStore.collection(Consts.USERS_COLLECTION_NAME)
        .document(userId)
        .addSnapshotListener { snapShot, error ->
            val result = if (snapShot != null) {
                val userInfo = snapShot.toObject(User::class.java)
                Response.Success(userInfo!!)
            } else {
                Response.Error(error?.message ?: error.toString())
            }
            trySend(result)
        }
//        awaitClose()

    awaitClose {
        this.cancel()
        listener.remove()
        close()
    }
}

and I am getting this error :-

java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.

I have seen many questions about it I have tried chat gpt's code but getting same error again and again.

My ViewModel:-

viewModelScope.launch {
        if (!firebaseUser?.isAnonymous!!){
            userRepo.getUserDetails(firebaseUser.uid).collect{
                when(it){
                    is Response.Error -> sendUIEvents(UIEvents.ShowSnackBar(it.message))
                    is Response.Loading -> Unit
                    is Response.Success -> {
                        // Some code
                               
                    }
                }
            }
        }
    }

My user flow is something like this:-

Splash Screen > Home Screen > Profile Screen > Upload Wallpaper Screen

Note :- Above code is in Home screen and Profile Screen and I am getting this error in Home screen.

If you have any possible answer then you can post it and one thing also I have very exact code in different file not even variables name changed and I am not getting any error from that I am getting error from this code only. Thank you.

My full stack :-

FATAL EXCEPTION: main Process: com.example.wallz, PID: 709 java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. Otherwise, a callback/listener may leak in case of external cancellation. See callbackFlow API documentation for the details. at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(Builders.kt:343) at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:60) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69) at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:245) at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161) at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397) at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513) at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.completeResumeReceive(AbstractChannel.kt:908) at kotlinx.coroutines.channels.ArrayChannel.offerInternal(ArrayChannel.kt:83) at kotlinx.coroutines.channels.AbstractSendChannel.trySend-JP2dKIU(AbstractChannel.kt:155) at kotlinx.coroutines.channels.ChannelCoroutine.trySend-JP2dKIU(Unknown Source:2) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2.invokeSuspend$lambda-0(UserRepoImpl.kt:109) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2.$r8$lambda$BnGWgnacAYeEFR-hHCZ0JzY-fc0(Unknown Source:0) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2$$ExternalSyntheticLambda0.onEvent(Unknown Source:4) at com.google.firebase.firestore.DocumentReference.lambda$addSnapshotListenerInternal$2$com-google-firebase-firestore-DocumentReference(DocumentReference.java:504) at com.google.firebase.firestore.DocumentReference$$ExternalSyntheticLambda2.onEvent(Unknown Source:6) at com.google.firebase.firestore.core.AsyncEventListener.lambda$onEvent$0$com-google-firebase-firestore-core-AsyncEventListener(AsyncEventListener.java:42) at com.google.firebase.firestore.core.AsyncEventListener$$ExternalSyntheticLambda0.run(Unknown Source:6) at android.os.Handler.handleCallback(Handler.java:938) at android.os.Handler.dispatchMessage(Handler.java:99) at android.os.Looper.loopOnce(Looper.java:226) at android.os.Looper.loop(Looper.java:313) at android.app.ActivityThread.main(ActivityThread.java:8751) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:571) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1135) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@b3d76f0, Dispatchers.Main.immediate]


Solution

  • Here is my solution and what I found after long research:-

    override suspend fun getUserDetails(userId: String): Flow<Response<User>> = callbackFlow {
        try {
            val snapshot = fireStore.collection(Consts.USERS_COLLECTION_NAME)
                .document(userId)
                .get()
                .await()
    
            val user = snapshot.toObject(User::class.java)
    
            val result = if (user != null) {
                Response.Success(user)
            } else {
                Response.Error("Could not get user's data.")
            }
            trySend(result)
            close()
            awaitCancellation()
        } catch (e: Exception) {
            trySend(Response.Error(e.message ?: e.toString()))
        }
    }.flowOn(Dispatchers.IO)
    

    This is right method to fetch data from firebase and this will not give any error and any exception.

    Here I have used get() function and await() function in code and lastly I have used awaitCancellation() function.