Search code examples
c#winformsasync-awaitsystem.reactiverx.net

Reactive.Subject make onNext wait for previous onNext action


I was trying to implement observable waiting for onNext action to complete before proceeding with next operation. Only way I found working is using SemaphoreSlim. Does Reactive have any way to do that without use of SemaphoreSlim? I couldn't find any.

 SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1,1);
  subject.Window(() => subject.Throttle(TimeSpan.FromMilliseconds(500)))
            .SelectMany(c => c.ToList())
            .Subscribe(async x =>
                {
                    await _semaphoreSlim.WaitAsync();
                    try
                    {
                        //await Async code here
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                });

Forcing Subscribe to be sync


Solution

  • To start with I created a sample Media_Load_Async that simulates what you're doing.

    static int max = 5;
    static Random random = new Random();
    static TimeSpan[] delays = Enumerable.Range(0, max).Select(x => TimeSpan.FromSeconds(random.Next(5) + 2)).ToArray();
    
    async Task<Unit> Media_Load_Async(int index, TimeSpan delay)
    {
        Console.WriteLine($"Start {index} - {delay}");
        await Task.Delay(delay);
        Console.WriteLine($"End {index}");
        return Unit.Default;
    }
    

    If I run this simple query then I get the results that you're trying to avoid:

    Observable
        .Range(0, max)
        .Subscribe(async x => await Media_Load_Async(x, delays[x]));
    
    Start 0 - 00:00:05
    Start 1 - 00:00:06
    Start 2 - 00:00:02
    Start 3 - 00:00:06
    Start 4 - 00:00:04
    End 2
    End 4
    End 0
    End 3
    End 1
    

    If I put in your SemaporeSlim code I get what I think you want:

    SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
    Observable
        .Range(0, max)
        .Subscribe(async x =>
        {
            await _semaphoreSlim.WaitAsync();
            try
            {
                await Media_Load_Async(x, delays[x]);
            }
            finally
            {
                _semaphoreSlim.Release();
            }
        });
    
    Start 0 - 00:00:04
    End 0
    Start 1 - 00:00:03
    End 1
    Start 2 - 00:00:06
    End 2
    Start 3 - 00:00:04
    End 3
    Start 4 - 00:00:03
    End 4
    

    Next, if I move the call to Media_Load_Async to inside the query itself then I'm back to the original issue:

    Observable
        .Range(0, max)
        .SelectMany(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
        .Subscribe();
    
    Start 0 - 00:00:06
    Start 1 - 00:00:02
    Start 2 - 00:00:06
    Start 3 - 00:00:05
    Start 4 - 00:00:02
    End 4
    End 1
    End 3
    End 0
    End 2
    

    But if I change out the SelectMany for a Select/Concat pair then I get what you want without a SemaphoreSlim:

    Observable
        .Range(0, max)
        .Select(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
        .Concat()
        .Subscribe();
    
    Start 0 - 00:00:04
    End 0
    Start 1 - 00:00:05
    End 1
    Start 2 - 00:00:02
    End 2
    Start 3 - 00:00:06
    End 3
    Start 4 - 00:00:06
    End 4