Search code examples
kotlinrealmrx-java2

RxJava2 Kotlin SwitchMap from RealmResults to Observable


OK this is my first question in RxJava so please be gentle.

I'm querying Realm for existing users, getting a RealmResults list back as a flowable, then I would like to either create a new user or return the existing user, then convert to JSON.

This is what I have so far. I'm a bit stuck.

 fun getUsers(realm: Realm): Flowable<RealmResults<User>> {
     return when (realm.isAutoRefresh) {
         true -> realm.where<User>().findAllAsync().asFlowable().filter(RealmResults<User>::isLoaded)
         false -> Flowable.just(realm.where<User>().findAll())
     }
 }

fun checkNewUserRequired(realm: Realm, results: RealmResults<User>): Observable<String> {
    if (results.isEmpty()) {
       //not complete, I will create a new user here
       return Observable.just("Dummy")
    } else {
       val user = realm.where<User>().findFirst()!!
       val detachedUser = realm.copyFromRealm(user)
       return Observable.just(userToJsonString(realm, detachedUser))
    }
}

val getNewUser= getUsers(realm)
    .take(1)
    .switchMap{ results -> checkNewUserRequired(realm, results) }
    .subscribe{
        //log result
        result : String -> Log.d(TAG, "JSON OUTPUT: $result")
    }

The error is on the switchmap. I'm very familiar with the operator in RxJS but I'm struggling with the syntax.

Any help much appreciated.


Solution

  • You are trying to switchMap an Flowable into an Observable, which are actually different types. You need to convert from one type to the other.

    The easiest solution in your case, since it looks like you will not have any issues related to Back pressure, is to convert checkNewUserRequired to return a Flowable

    Example

    fun checkNewUserRequired(realm: Realm, results: RealmResults<User>): Flowable<String> = Flowable.just(
        if (results.isEmpty()) "Dummy"
        else {
            val user = realm.where<User>().findFirst()!!
            val detachedUser = realm.copyFromRealm(user)
            userToJsonString(realm, detachedUser)
        }
    )
    

    You can also convert from an existing Observable to Flowable using the function toFlowable, but then you need to specify a BackpressureStrategy.

    Example

    .switchMap{ results -> checkNewUserRequired(realm, results).toFlowable(BackpressureStrategy.DROP) }