Search code examples
c#multithreadingsystem.reactivedata-synchronizationthread-synchronization

Can Reactive Extensions satisfy order invariance, synchronization and multithreading?


I want to be able process a stream of events on multiple cores, but keep everything synchronized so events are processed in lock step by all subscribers, so no single subscriber ever gets ahead of any other subscriber.

In other words, I want a fast subscriber to wait until all other slow subscribers are finished with each event, before moving onto the next. Each subscriber will have a filter, so it only processes the events its interested in.

If this works, I can easily take advantage of all of the cores in my system, without running into too many multithreading or synchronization issues.

Example

Imagine we have a stream of RX events generated on a single thread. We have two RX subscribers, A and B. We have these constraints:

  • Each RX event must be processed in lock step by all subscribers, i.e. event j=2 will not be processed by subscriber B until event j=1 has been completely processed by all subscribers A and B, event j=3 will not be processed by subscriber B until event j=2 has been completely processed by all subscribers A and B, etc.
  • Parallel processing of each RX event, i.e. subscriber A can process event j=1 in parallel with subscriber B processing event j=1, etc.
  • Order invariance, i.e. all subscribers receive the events in the order that they are created, so event j=0 will always proceed j=1, event j=1 will always proceed j=2, etc. This happens automatically if events are pushed in on a single thread, so this constraint has already been met.

What I have so far

I have tried a lot of combinations of Synchronize, in conjunction with the following code:

var sw = Stopwatch.StartNew();
var rx = new Subject<int>();
rx.ObserveOn(ThreadPoolScheduler.Instance)
    .Subscribe(o =>
{
    // Fast Subscriber A. Takes 20 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(20));
    Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

rx.ObserveOn(ThreadPoolScheduler.Instance)              
    .Subscribe(o =>
{
    // Slow Subscriber B. Takes 500 milliseconds.
    Thread.Sleep(TimeSpan.FromMilliseconds(500));
    Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
});

for (int j = 0; j < 5; j++)
{

    int j1 = j;
    rx.OnNext(j1);
    Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
}

Current output of program

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 288 milliseconds.
Subscriber A: j=1 (thread 10). Time: 308 milliseconds.
Subscriber A: j=2 (thread 10). Time: 328 milliseconds.
Subscriber A: j=3 (thread 10). Time: 348 milliseconds.
Subscriber A: j=4 (thread 10). Time: 368 milliseconds.
Subscriber B: j=0 (thread 11). Time: 768 milliseconds.
Subscriber B: j=1 (thread 11). Time: 1268 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1768 milliseconds.
Subscriber B: j=3 (thread 11). Time: 2268 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2768 milliseconds.

Desired output of program

Push: j=0 (thread 9)
Push: j=1 (thread 9)
Push: j=2 (thread 9)
Push: j=3 (thread 9)
Push: j=4 (thread 9)
[any key to continue]
Subscriber A: j=0 (thread 10). Time: 000 milliseconds.
Subscriber B: j=0 (thread 11). Time: 000 milliseconds.
Subscriber A: j=1 (thread 10). Time: 500 milliseconds.
Subscriber B: j=1 (thread 11). Time: 500 milliseconds.
Subscriber A: j=2 (thread 10). Time: 1000 milliseconds.
Subscriber B: j=2 (thread 11). Time: 1000 milliseconds.
Subscriber A: j=3 (thread 10). Time: 1500 milliseconds.
Subscriber B: j=3 (thread 11). Time: 1500 milliseconds.
Subscriber A: j=4 (thread 10). Time: 2000 milliseconds.
Subscriber B: j=4 (thread 11). Time: 2000 milliseconds.

Essentially, I want all subscribers to process event j=0 in parallel, then all subscribers to process event j=1 in parallel, etc, even if some of the subscribers are slower than the others. In this case, Subscriber A is fast (20 milliseconds) and Subscriber B is slow (500 milliseconds), so we need some sort of lock or gate so subscriber A waits for subscriber B to finish before moving onto the next event, or vice-versa if subscriber B is faster than subscriber A.

Of course, this is what occurs naturally in single threaded mode, but then one loses the ability for the same event to be processed in parallel by many subscribers, which means I cannot easily take advantage of all of the cores on my system.

Update

Thank you @Jonas Chapuis for your answer using Sort().

However, in this particular case, what I am aiming for is to stop fast subscribers getting ahead of slow subscribers when consuming events, i.e. I need some sort of lock or gate so that a fast subscriber will wait until all of the slow subscribers have finished with the event, before moving onto the next event.

In other words, I want all subscribers to move in lock step through the events, with no individual subscriber getting ahead of the rest. The RX event will created on a single thread, so they will never get out of order.

Update

After some months, it turns out that I was using the wrong architecture, and this was the wrong question to ask.

Instead of observing on ThreadPoolScheduler.Instance, I should be observing on an EventLoopScheduler which locks all subscriptions to a single thread. This preserves the order.

In order to get parallelism for time series data, it's better to divide the data processing into a pipeline with multiple stages, with each thread concentrating on a single pipeline stage. This is much simpler to deal with, and satisfies all of the constraints above.


Solution

  • Ok I had indeed not fully understood your requirements, sorry about that. Below you'll find a different approach which relies on subscribers signalling that they are done via dedicated subjects. These subjects are then zipped together: this gives you the "lock" semantics (note that overloads of the Zip operator support up to 16 sources).

     var sw = Stopwatch.StartNew();
     var rx = new Subject<int>();
     var subscriberADone = new Subject<Unit>();
     var subscriberBDone = new Subject<Unit>();
     var bothSubscribersDone = subscriberADone.Zip(subscriberBDone, (_, __) => Unit.Default);
     var lockStepInput = rx.Zip(bothSubscribersDone.StartWith(Unit.Default), (i, _) => i);
     lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
         .Subscribe(o =>
         {
             // Fast Subscriber A. Takes 20 milliseconds.
             Thread.Sleep(TimeSpan.FromMilliseconds(20));
             Console.Write("Subscriber A: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
             subscriberADone.OnNext(Unit.Default);
         });
    
     lockStepInput.ObserveOn(ThreadPoolScheduler.Instance)
         .Subscribe(o =>
         {
             // Slow Subscriber B. Takes 500 milliseconds.
             Thread.Sleep(TimeSpan.FromMilliseconds(500));
             Console.Write("Subscriber B: {0} (thread {1}). Time: {2} milliseconds.\n", o, Thread.CurrentThread.ManagedThreadId.ToString("").PadLeft(2), sw.ElapsedMilliseconds);
             subscriberBDone.OnNext(Unit.Default);
         });
    
     for (int j = 0; j < 5; j++)
     {
    
         int j1 = j;
         rx.OnNext(j1);
         Console.Write("Push: {0} (thread {1})\n", j, Thread.CurrentThread.ManagedThreadId);
     }
    

    This generated the following output:

    Push: 0 (thread 9)
    Push: 1 (thread 9)
    Push: 2 (thread 9)
    Push: 3 (thread 9)
    Push: 4 (thread 9)
    Subscriber A: 0 (thread 10). Time: 111 milliseconds.
    Subscriber B: 0 (thread 11). Time: 591 milliseconds.
    Subscriber A: 1 (thread 10). Time: 611 milliseconds.
    Subscriber B: 1 (thread 11). Time: 1091 milliseconds.
    Subscriber A: 2 (thread 10). Time: 1111 milliseconds.
    Subscriber B: 2 (thread 11). Time: 1591 milliseconds.
    Subscriber A: 3 (thread 10). Time: 1611 milliseconds.
    Subscriber B: 3 (thread 11). Time: 2091 milliseconds.
    Subscriber A: 4 (thread 10). Time: 2111 milliseconds.
    Subscriber B: 4 (thread 11). Time: 2591 milliseconds.