Search code examples
c#system.reactivetpl-dataflow

Sorting buffered Observables


I've got a stream of tokens that are produced very quickly and a processer that is relatively slow. The tokens are of three sub-types and I would prefer them to processed by their priority. So, I would like the tokens to be buffered after they've been produced and are waiting to be processed and have that buffer sorted by priority.

Here're my classes:

public enum Priority
{
    High   = 3,
    Medium = 2,
    Low    = 1
}

public class Base : IComparable<Base>
{
    public int Id { get; set; }

    public int CompareTo(Base other)
    {
        return Id.CompareTo(other.Id);
    }
}

public class Foo : Base { }
public class Bar : Base { }
public class Baz : Base { }

public class Token : IComparable<Token>
{
    private readonly string _toString;

    public Foo Foo { get; }

    public Bar Bar { get; }

    public Baz Baz { get; }

    public Priority Priority =>
        Baz == null
            ? Bar == null
                ? Priority.High
                : Priority.Medium
            : Priority.Low;

    public int CompareTo(Token other)
    {
        if (Priority > other.Priority)
        {
            return -1;
        }

        if (Priority < other.Priority)
        {
            return 1;
        }

        switch (Priority)
        {
            case Priority.High:
                return Foo.CompareTo(other.Foo);
            case Priority.Medium:
                return Bar.CompareTo(other.Bar);
            case Priority.Low:
                return Baz.CompareTo(other.Baz);
            default:
                throw new ArgumentOutOfRangeException();
        }
    }

    public override string ToString()
    {
        return _toString;
    }

    public Token(Foo foo)
    {
        _toString = $"{nameof(Foo)}:{foo.Id}";
        Foo = foo;
    }

    public Token(Foo foo, Bar bar) : this(foo)
    {
        _toString += $":{nameof(Bar)}:{bar.Id}";
        Bar = bar;
    }

    public Token(Foo foo, Baz baz) : this(foo)
    {
        _toString += $":{nameof(Baz)}:{baz.Id}";
        Baz = baz;
    }
}

And here is my producer code:

var random = new Random();
var bazId = 0;
var barId = 0;

var fooTokens = (from id in Observable.Interval(TimeSpan.FromSeconds(1))
                                      .Select(Convert.ToInt32)
                                      .Take(3)
                 select new Token(new Foo { Id = id }))
                .Publish();

var barTokens = (from fooToken in fooTokens
                 from id in Observable.Range(0, random.Next(5, 10))
                                      .Select(_ => Interlocked.Increment(ref barId))
                 select new Token(fooToken.Foo, new Bar { Id = id }))
                .Publish();

var bazTokens = (from barToken in barTokens
                 from id in Observable.Range(0, random.Next(1, 5))
                                      .Select(_ => Interlocked.Increment(ref bazId))
                 select new Token(barToken.Foo, new Baz { Id = id }))
                .Publish();

var tokens = bazTokens.Merge(barTokens)
                      .Merge(fooTokens)
                      .Do(dt =>
                      {
                          Console.ForegroundColor = ConsoleColor.Red;
                          Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
                      });

// Subscription

bazTokens.Connect();
barTokens.Connect();
fooTokens.Connect();

However I'm a bit stuck as to how to buffer and sort the tokens. If I do this, the tokens appear to be produced and consumed at the same time, which suggests that there's some buffering going on behind the scenes, but I can't control it.

tokens.Subscribe(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

If I use a TPL Dataflow ActionBlock, I can see the tokens being produced correctly and processed correctly, but I'm still not sure how to do the sorting.

var proc = new ActionBlock<Token>(dt =>
{
    Thread.Sleep(TimeSpan.FromMilliseconds(250));
    Console.ForegroundColor = ConsoleColor.Green;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
});

tokens.Subscribe(dt => proc.Post(dt));

Any ideas or pointers where to go next would be appreciated!

Update:

I got something to work. I added a helper to clean up the code for displaying the test data:

private static void Display(Token dt, ConsoleColor col, int? wait = null)
{
    if (wait.HasValue)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(wait.Value));
    }
    Console.ForegroundColor = col;
    Console.WriteLine($"{DateTime.Now:mm:ss.fff}:{dt}");
}

I added a SortedSet:

var set = new SortedSet<Token>();

var tokens = bazTokens
    .Merge(barTokens)
    .Merge(fooTokens)
    .Do(dt => Display(dt, ConsoleColor.Red));

tokens.Subscribe(dt => set.Add(dt));

And I also added a consumer, although I'm not a fan of my implementation:

var source = new CancellationTokenSource();

Task.Run(() =>
{
    while (!source.IsCancellationRequested)
    {
        var dt = set.FirstOrDefault();
        if (dt == null)
        {
            continue;
        }

        if (set.Remove(dt))
        {
            Display(dt, ConsoleColor.Green, 250);
        }
    }
}, source.Token);

So, now I'm getting exactly the results I'm looking for, but a) I'm not happy with the while polling and b) If I want multiple consumers, I'm going to run into race conditions. So, I'm still looking for better implementations if anyone has one!


Solution

  • Okay, so I used a normal lock for accessing the SortedSet, then increased the number of consumers and it seems to be working fine, so although I've not been able to come up with a full RX or a split RX / TPL DataFlow solution, this now does what I want, so I'll just show the changes I made in addition to the update in the original question and leave it there.

    var set = new SortedSet<Token>();
    var locker = new object();
    
    var tokens = bazTokens
        .Merge(barTokens)
        .Merge(fooTokens)
        .Do(dt => Display(dt, ConsoleColor.Red));
    
    tokens.Subscribe(dt =>
    {
        lock (locker)
        {
            set.Add(dt);
        }
    });
    
    for (var i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Run(() =>
        {
            while (!source.IsCancellationRequested)
            {
                Token dt;
    
                lock (locker)
                {
                    dt = set.FirstOrDefault();
                }
    
                if (dt == null)
                {
                    continue;
                }
    
                bool removed;
    
                lock (locker)
                {
                    removed = set.Remove(dt);
                }
    
                if (removed)
                {
                    Display(dt, ConsoleColor.Green, 750);
                }
            }
        }, source.Token);
    }
    

    Thank you to the people who posted solutions, I appreciate the time you spent.