What is the correct way to prevent subscribers to be called in parallel, before previous call is completed?
I have kind of race condition atm with code like this
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
if(someBool)
{
await ...
Start();
}
else
{
await ...
Stop();
}
});
If SomeBool
changes rapidly, then it can happens that calls will be like this:
Start()
Stop()
Stop()
Start()
or worse. How can I ensure that it is always
Start()
Stop()
Start()
Stop()
I can put lock inside or use some kind of queue to ensure order of calls. But I hope there is something existing for situation like this or I rather need to use reactive concept correctly, e.g. creating a new observable or who knows what.
Forgot to add mcve. Create new console app, add nugets: ReactiveUI
and ReactiveUI.Fody
.
class Program
{
static SomeReactive SomeReactive { get; } = new();
static void Main(string[] args)
{
SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
if (someBool)
{
await Task.Delay((int)(Random.Shared.NextDouble() * 100));
Console.WriteLine("start");
}
else
{
await Task.Delay((int)(Random.Shared.NextDouble() * 100));
Console.WriteLine("stop");
}
});
for (int i = 0; i < 10; i++)
{
SomeReactive.SomeBool = !SomeReactive.SomeBool;
Thread.Sleep(50);
}
Console.ReadKey();
}
}
class SomeReactive : ReactiveObject
{
[Reactive]
public bool SomeBool { get; set; }
}
Using this answer I've made my own SybscribeAsync
extension method with parameter:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> func) =>
source.Select(o => Observable.FromAsync(_ => func(o)))
.Concat()
.Subscribe();
The method SubscribeAsync
should be used instead of Subscribe(async o => ...)
, unless async void
is not a problem (it could be).
P.S.: naming method SubscribeSynchronous
like ReactiveMarbles does is the option.