Search code examples
google-cloud-firestorerx-java2rx-androidreactivexrx-kotlin

Avoid memory leak when Observable.create() to emit listener objects


I'm writing a wrapper around FirebaseFirestore snapshot listener that emits the changes using RxKotlin Observable.

I wrote the following class which makes use of create() method to create the observable and emit the changes asynchronously when a new data snapshot is available.

The problem is that I'm leaking memory every time I create an instance of this class and I stop using it. What is the best way to rewrite this class without leaking memory?

Any resources about how to create Observables that can emit objects from listeners would be really helpful!

class DocumentRepository<T : ModelWithMetadata>(
        path: List<String>,
        private val model: Class<T>) {

    private var documentReference: DocumentReference

    val observable: Observable<T>

    private var emitter: ObservableEmitter<T>? = null
    private lateinit var item: T


    init {
        documentReference = FirebaseFirestore.getInstance().collection(path[0]).document(path[1])
        for (i in 2..path.lastIndex step 2)
            documentReference = documentReference.collection(path[i]).document(path[i + 1])

        observable = Observable.create(this::listenChanges)
    }

    private fun listenChanges(emitter: ObservableEmitter<T>) {
        this.emitter = emitter
        documentReference.addSnapshotListener { documentSnapshot, _ ->
            item = documentSnapshot.toObject(this.model)
            this.emitter?.onNext(item)
        }
    }

    fun get() {
        emitter?.onNext(item)
    }

    fun put(item: T) {
        item.updatedAt = TimeExtension.now()
        documentReference.set(item)
    }

    fun delete(item: T) {
        documentReference.delete()
    }
}

Solution

  • documentReference.addSnapshotListener returns a ListenerRegistration which allows you to call ListenerRegistration#remove to remove the listener.

    And, Emitter#setCancellable allows you to clean up resources, in this case detaching a listener, when the Observable is unsubscribed.

    So your listenChanges would look like this:

    private fun listenChanges(emitter: ObservableEmitter<T>) {
      this.emitter = emitter
      val registration = documentReference.addSnapshotListener { documentSnapshot, _ ->
        item = documentSnapshot.toObject(this.model)
        this.emitter?.onNext(item)
      }
      emitter.setCancellable { registration.remove() }
    }