I'm trying to introduce Rx in my Xamarin mobile app and I'd like to chain a sequence of calls during my login phase on app startup.
TL;DR; How do I run 2 / 3 observables one after the other to retrieve data and setup threading properly.
More in detail I perform the following:
LoginUser() : IObservable<User>
RetrieveRemoteA() : IObservable<A>
or RetrieveRemoteB() : IObservable<B>
A
or B
), I update the UI.This is a sort of diagram to describe the flow (explained above):
Considering that I want to avoid calling new observables (sources) from within the Subscribe()
of the previous one, this is what I've implemente for retrieving my data and update the UI (the flow in the image has been serialized in the code below).
IObservable<User> loginUserObservable = LogInUser(currentUser);
loginUserObservable
.SubscribeOn(ThreadPoolScheduler.Instance)
.SelectMany(
(user) =>
{
if (user.Type == UserType.A)
return RetrieveRemoteA(user.UserId); // outputs IObservable<A>
return Observable.Return(new A());
},
(user, a) =>
{
B b = null;
return new { user, a, b }; // Create anonymous type to keet track of 'user'
})
.SelectMany(
(xType) =>
{
if (xType.user.Type == UserType.B)
return RetrieveRemoteB(xType.user.UserId); // outputs IObservable<B>
return Observable.Return(new B());
},
(xType, bData) =>
{
var user = xType.user;
var a = xType.a;
var b = b;
return new { user, a, b };
})
.ObserveOn(ImmediateScheduler.Instance)
.Select((xType) =>
{
if (xType.user.Type == UserType.A)
{
A a = xType.a;
B b = null;
return new { a, b };
}
else {
A a = null;
B b = xType.b;
return new { a, b };
}
})
.Subscribe((result) =>
{
if (result.a != null)
{
Console.WriteLine($"ID: {result.a.Id}");
}
else {
Console.WriteLine($"ID: {result.b.Id}");
}
});
Once running, it seems that the flow get stacked within RetrieveRemoteA(user.UserId)
, even if the method finishes without errors.
public IObservable<A> RetrieveRemoteA(string userId)
{
return Observable.FromAsync<A>(async () =>
{
A a = await CustomAPI(userId)
return a;
}
}
When I implement the flow sequentially by calling each new observable within the Subscribe of the previous one, it works properly (but it is not the right way to do it).
I think this is an issue with Threads or either with a wrong Rx implementation of mine.
Do you have any clue, please?
What you're wanting to do is really quite easy using the Observable.Amb
operator. It's job is to allow two observables to run, but once one returns a value then ignore the other one.
Here's how to write your query:
var query =
from user in loginUserObservable
let a_observable = user.Type == UserType.A ? RetrieveRemoteA(user.UserId) : Observable.Never<A>()
let b_observable = user.Type == UserType.B ? RetrieveRemoteB(user.UserId) : Observable.Never<B>()
from ab in
Observable
.Amb(
a_observable.Select(x => new { a = x, b = (B)null }),
b_observable.Select(x => new { a = (A)null, b = x }))
select ab;
That's it.
Given this sample data you can have a play:
var currentUser = "";
IObservable<User> LogInUser(string cu) => Observable.Start(() => new User() { Type = UserType.A, UserId = "Z1" });
IObservable<A> RetrieveRemoteA(string id) => Observable.Start(() => new A() { Id = "A.Z2" });
IObservable<B> RetrieveRemoteB(string id) => Observable.Start(() => new B() { Id = "B.Z3" });
IObservable<User> loginUserObservable = LogInUser(currentUser);
/* put `query` here */
query
.Subscribe(x =>
Console.WriteLine($@"ID: {(x.a != null ? x.a.Id : x.b.Id)}"));
Alternatively, you could just avoid the Amb
operator entirely by doing this simple query:
var query =
from user in loginUserObservable
from ab in
user.Type == UserType.A
? RetrieveRemoteA(user.UserId).Select(x => new { a = x, b = (B)null })
: RetrieveRemoteB(user.UserId).Select(x => new { a = (A)null, b = x })
select ab;