Search code examples
c#reactiveui

How to queue WhenAnyValue subscriber calls containing asynchronous code?


What is the correct way to prevent subscribers to be called in parallel, before previous call is completed?

I have kind of race condition atm with code like this

SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
{
    if(someBool)
    {
        await ...
        Start();
    }
    else
    {
        await ...
        Stop();
    }
});

If SomeBool changes rapidly, then it can happens that calls will be like this:

Start()
Stop()
Stop()
Start()

or worse. How can I ensure that it is always

Start()
Stop()
Start()
Stop()

I can put lock inside or use some kind of queue to ensure order of calls. But I hope there is something existing for situation like this or I rather need to use reactive concept correctly, e.g. creating a new observable or who knows what.


Forgot to add mcve. Create new console app, add nugets: ReactiveUI and ReactiveUI.Fody.

class Program
{
    static SomeReactive SomeReactive { get; } = new();

    static void Main(string[] args)
    {
        SomeReactive.WhenAnyValue(o => o.SomeBool).Subscribe(async someBool =>
        {
            if (someBool)
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("start");
            }
            else
            {
                await Task.Delay((int)(Random.Shared.NextDouble() * 100));
                Console.WriteLine("stop");
            }
        });

        for (int i = 0; i < 10; i++)
        {
            SomeReactive.SomeBool = !SomeReactive.SomeBool;
            Thread.Sleep(50);
        }

        Console.ReadKey();
    }
}

class SomeReactive : ReactiveObject
{
    [Reactive]
    public bool SomeBool { get; set; }
}

Solution

  • Using this answer I've made my own SybscribeAsync extension method with parameter:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> func) =>
                source.Select(o => Observable.FromAsync(_ => func(o)))
                    .Concat()
                    .Subscribe();
    

    The method SubscribeAsync should be used instead of Subscribe(async o => ...), unless async void is not a problem (it could be).

    P.S.: naming method SubscribeSynchronous like ReactiveMarbles does is the option.