Search code examples

Reactive.Subject make onNext wait for previous onNext action

I was trying to implement observable waiting for onNext action to complete before proceeding with next operation. Only way I found working is using SemaphoreSlim. Does Reactive have any way to do that without use of SemaphoreSlim? I couldn't find any.

 SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1,1);
  subject.Window(() => subject.Throttle(TimeSpan.FromMilliseconds(500)))
            .SelectMany(c => c.ToList())
            .Subscribe(async x =>
                    await _semaphoreSlim.WaitAsync();
                        //await Async code here

Forcing Subscribe to be sync


  • To start with I created a sample Media_Load_Async that simulates what you're doing.

    static int max = 5;
    static Random random = new Random();
    static TimeSpan[] delays = Enumerable.Range(0, max).Select(x => TimeSpan.FromSeconds(random.Next(5) + 2)).ToArray();
    async Task<Unit> Media_Load_Async(int index, TimeSpan delay)
        Console.WriteLine($"Start {index} - {delay}");
        await Task.Delay(delay);
        Console.WriteLine($"End {index}");
        return Unit.Default;

    If I run this simple query then I get the results that you're trying to avoid:

        .Range(0, max)
        .Subscribe(async x => await Media_Load_Async(x, delays[x]));
    Start 0 - 00:00:05
    Start 1 - 00:00:06
    Start 2 - 00:00:02
    Start 3 - 00:00:06
    Start 4 - 00:00:04
    End 2
    End 4
    End 0
    End 3
    End 1

    If I put in your SemaporeSlim code I get what I think you want:

    SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
        .Range(0, max)
        .Subscribe(async x =>
            await _semaphoreSlim.WaitAsync();
                await Media_Load_Async(x, delays[x]);
    Start 0 - 00:00:04
    End 0
    Start 1 - 00:00:03
    End 1
    Start 2 - 00:00:06
    End 2
    Start 3 - 00:00:04
    End 3
    Start 4 - 00:00:03
    End 4

    Next, if I move the call to Media_Load_Async to inside the query itself then I'm back to the original issue:

        .Range(0, max)
        .SelectMany(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
    Start 0 - 00:00:06
    Start 1 - 00:00:02
    Start 2 - 00:00:06
    Start 3 - 00:00:05
    Start 4 - 00:00:02
    End 4
    End 1
    End 3
    End 0
    End 2

    But if I change out the SelectMany for a Select/Concat pair then I get what you want without a SemaphoreSlim:

        .Range(0, max)
        .Select(x => Observable.FromAsync(() => Media_Load_Async(x, delays[x])))
    Start 0 - 00:00:04
    End 0
    Start 1 - 00:00:05
    End 1
    Start 2 - 00:00:02
    End 2
    Start 3 - 00:00:06
    End 3
    Start 4 - 00:00:06
    End 4