Search code examples
androidrealmrx-javaobservablerealm-mobile-platform

RxJava Observable never perform onCompleted


I'm using RxJava with Realm to query the mobile Database. I need to inform my view to update the list when the query is done but it seems like the flatMap and the doOnNext works but then it never go onCompleted. I need a trigger to know it's over.

Realm.getDefaultInstance().asObservable().flatMap((realm) ->
        realm.where(FileDoc.class).isNull("parent").isNotNull("updatedDate").findAllSortedAsync("name").asObservable()
                .flatMap((files) -> {
                    documents.get("root").clear();
                    documents.get("root").addAll(realm.copyFromRealm(files));
                    return realm.where(Folder.class).isNull("parent").findAllSortedAsync("name").asObservable().doOnCompleted(() -> {
                        Log.e(TAG, "Barr!");
                    });
                })
                .doOnNext((folders) -> {
                    documents.get("root").addAll(realm.copyFromRealm(folders));
                })).doOnCompleted(() -> Log.e(TAG, "Fooo!"))
        .doOnError(error -> handleErrorEvent(error))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe();

Here Barr! and Fooo! are never printed... Anyone has a clue?


UPDATED

Realm.getDefaultInstance().asObservable().first().flatMap((realm) ->
                realm.where(IncidentTemplate.class).equalTo("deleted", false).findAllAsync().asObservable()
                        .filter(results -> results.isLoaded())
                        .first()
                        .doOnNext((files) -> {
                            if (!incidents.containsKey("INCIDENT")) {
                                incidents.put("INCIDENT", new ArrayList<>());
                            }

                            incidents.get("INCIDENT").clear();
                            incidents.get("INCIDENT").addAll(realm.copyFromRealm(files));

                        })
                        .doOnTerminate(() -> {
                            Log.e(TAG, "Closing realm");
                            realm.close();
                        }))
                .doOnCompleted(() -> {
                    emitStoreChange(new CobaltStore.CobaltStoreChangeEvent());
                    Log.e(TAG, "EMIT INCIDENT");
                })
                .doOnError(error -> handleErrorEvent(error))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(AndroidSchedulers.from(backgroundLooper))
                .subscribe();

Solution

  • 1.) Realm.getDefaultInstance().asObservable()

    You obtain a local Realm instance that you will never be able to close, therefore you are leaking memory.

    2.) this is not how asObservable() is designed to work.

    Does it make sense for an OnClickListener to complete, and emit terminal event?

    For example, you click the button once, then the observable "finishes" and will never emit any event ever again, even if the user clicks the button afterwards?

    Of course not, and therefore you should not expect it from Realm's RealmChangeListener either; which is what is bound to the Realm or RealmResults so that you can listen to any future writes made to the Realm or its specific table.

    Solution is probably to just use something like

    Observable.fromCallable(() => {
        try(Realm realm = Realm.getDefaultInstance()) {
            RealmResults<FileDoc> docs = realm.where(FileDoc.class)
                 .isNull("parent")
                 .isNotNull("updatedDate")
                 .findAllSorted("name");
            documents.get("root").clear(); // <-- shouldn't you modify a new copy and return this?
            List<FileDoc> unmanagedDocs = realm.copyFromRealm(files);
            documents.get("root").addAll(unmanagedDocs);
            return unmanagedDocs;
        }
    }
    

    Or as you pointed out, .filter((data) -> data.isLoaded()).first() also works, although I didn't think of that.