Search code examples
javamultithreadingrealmrx-javaobservable

How do I prevent Realm Threading issue when using a Realm call within an Rx Observable transaction?


This is the ViewModel where the Rx transactions are called

RealmHelperRepo is the implementation of HelperRepo interface

@PerActivity
public class RoleSelectionViewModel extends BaseViewModel<RoleSelectionMvvm.View> implements RoleSelectionMvvm.ViewModel {

     private Disposable roleGroupSubscription;

     @Inject
        public RoleSelectionViewModel(@AppContext Context context, HelperRepo helperRepo, ApiOAuth2 ApiOAuth2) {

        this.mContext = context;
        this.mUserRepo = userRepo;
        this.mHelperRepo = helperRepo;
        ApiOAuth2.initialize();
        this.mApiOAuth2 = ApiOAuth2;

        this.mCurrentUser = mUserRepo.getByField("isLoggedIn", true, true);
        if (mCurrentUser != null) {
            this.mCurrentUserId = mCurrentUser.getId();
            this.mHelper = mHelperRepo.getByField("user.id", mCurrentUserId, true);
    }

  Observable<Response<ResponseHelper>> postHelperObservable = mApiOAuth2.postHelperRX(new Helper());
  Observable<Response<ResponseHelper>> getHelperObservable = mApiOAuth2.getHelperRX(mCurrentUserId);

 roleGroupSubscription = postRoleGroupsObservable
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .flatMap((response) -> {
                        if (response.isSuccessful()) {
                            ResponseHelper responseHelper = response.body();
                            mHelper = responseHelper.getHelper();
                            return Observable.just(mHelper);
                        } else if (response.code() == 409) {
                       // handle POST conflict (i.e. helper already exists)
                            return getHelperObservable;
                        }

                   })
                   .subscribe((data) -> {
                        if (data instanceof Response<?>) {
                            // data came from getHelperObservable
                            Response response = (Response) data;
                            if (!response.isSuccessful()) {
                               ResponseHelper responseHelper = (ResponseHelper) response.body();
                            mHelper = responseHelper.getHelper();
                       else {

                            // data came from Observable.just(helper)
                            mApiOAuth2.getHelperRX(mCurrentUserId).subscribe(
                                    responseHelperResponse -> {

                                        if (responseHelperResponse.isSuccessful()) {

                                            String helperID = responseHelperResponse.body().getHelper().getId();
                                            Log.d("RealmCount", "save: " + Realm.getLocalInstanceCount(realmProvider.get().getConfiguration()));
                                            mHelper.setId(helperID);
                                            mHelper.setUser(mCurrentUser);
--------> // when mHelperRepo.save(mHelper) is called, it goes to RealmHelperRepo to save and 
--------> // thus triggering mRealm.executeTransaction causing Realm threading
                                            mHelperRepo.save(mHelper);
                                        }
                                        saveAndBegin();
                                    },
                                    Throwable::printStackTrace);
                                    });

This is the RealmRepo class where realm calls are made.

@PerApplication
public class RealmHelperRepo implements HelperRepo {

   private final Provider<Realm> mRealmProvider;
   private Realm mRealm;

   @Inject
    public RealmHelperRepo(Provider<Realm> realmProvider) {
        this.mRealmProvider = realmProvider;
        this.mRealm = mRealmProvider.get();
}


  @Override
    public void save(Helper helper) {
        if (mRealm != null) {
---------> // code runs into threading issue here when a realmThread executeTransaction is called
        mRealm.executeTransaction(r -> r.copyToRealmOrUpdate(helper)); 
        }
    }

Is there something I'm missing here? Other Rx functions I should be using instead of flatmap? Are there other ways to save my observable data without running into threading issue? help!


Solution

  • Is there something I'm missing here?

    A Realm instance represents a reference-counted, thread-local instance. It's not a global thing, it's a "local instance" that is opened by getInstance() and then closed by close().

    So you can't just initialize a Realm instance as a singleton, because it won't be accessible from background threads.


    You could for example provide a singleton Realm manager class that is able to open thread-local Realm instances.

    /**
     * The RealmManager allows creating a singleton Realm manager which can open thread-local instances.
     *
     * It also allows obtaining the open thread-local instance without incrementing the reference count.
     */
    @PerApplication
    public class RealmManager {
        private final ThreadLocal<Realm> localRealms = new ThreadLocal<>();
    
        @Inject
        RealmManager() {
        }
    
        /**
         * Opens a reference-counted local Realm instance.
         *
         * @return the open Realm instance
         */
        public Realm openLocalInstance() {
            checkDefaultConfiguration();
            Realm realm = Realm.getDefaultInstance(); // <-- maybe configuration should be constructor parameter
            if(localRealms.get() == null) {
                localRealms.set(realm);
            }
            return realm;
        }
    
        /**
         * Returns the local Realm instance without adding to the reference count.
         *
         * @return the local Realm instance
         * @throws IllegalStateException when no Realm is open
         */
        public Realm getLocalInstance() {
            Realm realm = localRealms.get();
            if(realm == null) {
                throw new IllegalStateException(
                        "No open Realms were found on this thread.");
            }
            return realm;
        }
    
        /**
         * Closes local Realm instance, decrementing the reference count.
         *
         * @throws IllegalStateException if there is no open Realm.
         */
        public void closeLocalInstance() {
            checkDefaultConfiguration();
            Realm realm = localRealms.get();
            if(realm == null) {
                throw new IllegalStateException(
                        "Cannot close a Realm that is not open.");
            }
            realm.close();
            // noinspection ConstantConditions
            if(Realm.getLocalInstanceCount(Realm.getDefaultConfiguration()) <= 0) {
                localRealms.set(null);
            }
        }
    
        private void checkDefaultConfiguration() {
            if(Realm.getDefaultConfiguration() == null) {
                throw new IllegalStateException("No default configuration is set.");
            }
        }
    }
    

    You can use this like so

    @PerApplication
    public class RealmHelperRepo implements HelperRepo {
       private final RealmManager realmManager;
    
        @Inject
        public RealmHelperRepo(RealmManager realmManager) {
            this.realmManager = realmManager;
        }
    
    
        @Override
        public void save(Helper helper) {
            try(Realm realm = realmManager.openLocalInstance()) {
                realm.executeTransaction(r -> r.copyToRealmOrUpdate(helper)); 
            }
        }
    

    Technically it just hides the Realm.getDefaultInstance() call and allows you to obtain your thread-local instance even without incrementing the internal RealmCache reference count, so not much real magic there.

    Just open a Realm instance for a thread, and don't forget to close it when it is no longer needed.