OK this is my first question in RxJava so please be gentle.
I'm querying Realm for existing users, getting a RealmResults list back as a flowable, then I would like to either create a new user or return the existing user, then convert to JSON.
This is what I have so far. I'm a bit stuck.
fun getUsers(realm: Realm): Flowable<RealmResults<User>> {
return when (realm.isAutoRefresh) {
true -> realm.where<User>().findAllAsync().asFlowable().filter(RealmResults<User>::isLoaded)
false -> Flowable.just(realm.where<User>().findAll())
}
}
fun checkNewUserRequired(realm: Realm, results: RealmResults<User>): Observable<String> {
if (results.isEmpty()) {
//not complete, I will create a new user here
return Observable.just("Dummy")
} else {
val user = realm.where<User>().findFirst()!!
val detachedUser = realm.copyFromRealm(user)
return Observable.just(userToJsonString(realm, detachedUser))
}
}
val getNewUser= getUsers(realm)
.take(1)
.switchMap{ results -> checkNewUserRequired(realm, results) }
.subscribe{
//log result
result : String -> Log.d(TAG, "JSON OUTPUT: $result")
}
The error is on the switchmap. I'm very familiar with the operator in RxJS but I'm struggling with the syntax.
Any help much appreciated.
You are trying to switchMap
an Flowable
into an Observable
, which are actually different types. You need to convert from one type to the other.
The easiest solution in your case, since it looks like you will not have any issues related to Back pressure, is to convert checkNewUserRequired
to return a Flowable
Example
fun checkNewUserRequired(realm: Realm, results: RealmResults<User>): Flowable<String> = Flowable.just(
if (results.isEmpty()) "Dummy"
else {
val user = realm.where<User>().findFirst()!!
val detachedUser = realm.copyFromRealm(user)
userToJsonString(realm, detachedUser)
}
)
You can also convert from an existing Observable
to Flowable
using the function toFlowable
, but then you need to specify a BackpressureStrategy
.
Example
.switchMap{ results -> checkNewUserRequired(realm, results).toFlowable(BackpressureStrategy.DROP) }