Search code examples
c#concurrencytask-parallel-librarytpl-dataflow

Dynamically subscribe/unsubscribe in TPL Dataflow


I have a stream of messages and based on some criteria I want each consumer to be able to process some of them in parallel. Each consumer should be able to subscribe and unsubscribe dynamically. enter image description here

I have the following input data constraints:

  • Around 500 messages per seconds
  • Around 15000 consumers
  • Around 500 categories
  • In most cases, each consumer is subscribed for 1-3 categories.

So far this is what I have:

public class Test
{
    static void Main()
    {
        var consumer1 = new Consumer("Consumer1");
        consumer1.SubscribeForCategory(1);
        consumer1.SubscribeForCategory(2);

        var consumer2 = new Consumer("Consumer2");
        consumer2.SubscribeForCategory(2);
        consumer2.SubscribeForCategory(3);
        consumer2.SubscribeForCategory(4);

        var consumer3 = new Consumer("Consumer3");
        consumer3.SubscribeForCategory(3);
        consumer3.SubscribeForCategory(4);

        var consumers = new[] {consumer1, consumer2, consumer3};
        var publisher = new Publisher(consumers);

        var message1 = new Message(1, "message1 test");
        var message2 = new Message(2, "message2");
        var message3 = new Message(1, "message3");
        var message4 = new Message(3, "message4 test");
        var message5 = new Message(4, "message5");
        var message6 = new Message(3, "message6");

        var messages = new[] {message1, message2, message3, message4, message5, message6};

        foreach (var message in messages)
        {
            publisher.Publish(message);
        }

        Console.ReadLine();
    }
}

public class Message
{
    public Message(int categoryId, string data)
    {
        CategoryId = categoryId;
        Data = data;
    }

    public int CategoryId { get; }

    public string Data { get; }
}

public class Publisher
{
    private readonly IEnumerable<Consumer> _consumers;

    public Publisher(IEnumerable<Consumer> consumers)
    {
        _consumers = consumers;
    }

    public void Publish(Message message)
    {
        IEnumerable<Consumer> consumers = _consumers.Where(c => c.CategoryIds.Contains(message.CategoryId));
        foreach (Consumer consumer in consumers)
        {
            consumer.AddMessage(message);
        }
    }
}

public class Consumer
{
    private readonly HashSet<int> _categoryIds;
    private readonly ActionBlock<Message> _queue;

    public Consumer(string name)
    {
        Name = name;
        _categoryIds = new HashSet<int>();

        _queue = new ActionBlock<Message>(async m => { await Foo(m); }, 
                                          new ExecutionDataflowBlockOptions 
                                          {
                                              MaxDegreeOfParallelism = 1, 
                                              SingleProducerConstrained = true
                                          });
    }

    public string Name { get; }

    public IReadOnlyCollection<int> CategoryIds => _categoryIds;

    public void AddMessage(Message message)
    {
        bool accepted = _queue.Post(message);
        if (!accepted)
        {
            Console.WriteLine("Message has been rejected!");
        }
    }

    public void SubscribeForCategory(int categoryId)
    {
        _categoryIds.Add(categoryId);
    }

    private async Task Foo(Message message)
    {
        // process message
        await Task.Delay(10);

        if (message.Data.Contains("test"))
        {
            _categoryIds.Remove(message.CategoryId);
        }

        Console.WriteLine($"[{Name}] - category id: [{message.CategoryId}] data: [{message.Data}]");
    }
}

Unfortunately, there are several issues with that solution:

  1. While consumer process each message there is the possibility to unsubscribe for some message which has been added to the ActionBlock input queue.
  2. In Publisher.cs I am iterating over each account category collection and later in Account Foo method, there is a chance to remove some category which will lead to the following exception: System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
  3. Also I am not pretty sure whether is a good idea to have "dispatching logic" into publisher.Publish()

One possible solution is to forward all messages to each consumer (and each consumer should take a decision whether should or shouldn't process it) but I am afraid that this is going to be much slower.

I am aware of actor model-based frameworks like Akka.Net and Microsoft Orleans, but I want all of this to happen in-process (if it's achievable of course).

Does anyone have a more elegant solution? Do you have any suggestions on how can I improve the current approach?


Solution

  • The TPL DataFlow library already provides what you want. Its blocks aren't queues, they're the actual producers and consumers. You could remove almost all of the code you added. You could even use a LINQ query to create and link the "publisher" and "consumers" :

    var n=10;
    var consumers=( from i in Enumerable.Range(0,n)
                    let categories=new ConcurrentDictinoary<int,int>()
                    select new { 
                                 Block=new ActionBlock(msg=>Consume(msg,categories)
                                                            ,blockOptions),
                                 Categories=categories
                    }).ToArray();
    
    foreach(var pair in consumers)
    {
        publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));
    }
    
    bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)
    {
        return categories.ContainsKey(msg.CategoryId);
    }
    
    async Task Consume(Message message,ConcurrentDictinary<int,int> categories)
    {
        if (message.Data.Contains("test"))
        {
            categories.TryRemove(message.CategoryId);
        }
        ...
    }
    

    It's no accident that the blocks work with functions. The Dataflow library and the CSP paradigm it's based on are very different from OOP, and much closer to functional programming.

    By the way, TPL Dataflow grew out of the Microsoft Robotics Frameworks and the Concurrency Runtime. In robotics and automation there are a lot of microprocessors exchanging messages. Dataflow It's specifically built to create complex processing meshes and handle lots of message.

    Explanation

    Dataflow isn't a set of queues, it contains active blocks that are meant to be linked in a pipeline. An ActionBlock isn't a queue, it has a queue. In reality it's a Consumer, typically found at the tail of a pipeline. A TransformBlock receives incoming messages, processes them one by one then sends them to any linked blocks.

    Blocks are linked, so you don't need to manually take messages from one block and pass them to another. The Link can contain a predicate, used to filter the messages accepted by target blocks. It's possible to cut a link by calling Dispose on it.

    Assuming this is the "consumer" method :

    async Task Consume(Message message)
    {
        await Task.Delay(100);
        Console.WriteLine($"Category id: [{message.CategoryId}] data: [{message.Data}]");
    }
    

    You can create a few ActionBlocks, perhaps in an array :

    var consumers=new[]{
         new ActionBlock(Consume),
         new ActionBlock(Consume),
         new ActionBlock(Consume)
    };
    

    Each action block could use a different delegate of course.

    The "head" of the pipeline should probably be a TransformBlock. In this case, the Publisher doesn't do anything except get linked to the target blocks. At least we can print something:

    Message PassThrough(Message message)
    {
        Console.WriteLine("Incoming");
        return Message;
    }
    
    var publisher=new TransformBlock(PassThrough);
    

    You can link the "publisher" to the "consumers" with LinkTo :

    var options=new DataflowLinkOptions { PropagateCompletion=true};
    
    var link1=publisher.LinkTo(consumers[0],options, msg=>msg.CategoryId % 3==0);
    var link2=publisher.LinkTo(consumers[1],options, msg=>msg.CategoryId % 3==1);
    var link3=publisher.LinkTo(consumers[2],options, msg=>msg.CategoryId % 3==2);
    

    Messages produced by the "publisher" block will be sent to the first target whose link predicate accepts it. Messages are offered to links in the order they were created. If no link accepts the message, it will stay in the output queue and block it.

    In real scenarios one should always ensure that all messages are handle or that there is a block that can handle anything that doesn't match.

    public.LinkTo(theOtherBlock,options);
    

    The link1, link2, link3 objects are just IDisposeables. They can be used to break a link :

    link2.Dispose();
    

    Links can be created and broken at any time, changing the shape of the pipeline (or mesh in more complex designs) as needed. Any messages already posted to a target block's queue won't be discarded if a link is broken or modified though.

    To reduce the number of unwanted messages we can add a bound to each block's input queue:

    var blockOptions=new DataflowBlockOptions { BoundedCapacity=1 };
    
    var consumers=new[]{
         new ActionBlock(Consume,blockOptions),
         new ActionBlock(Consume,blockOptions),
         new ActionBlock(Consume,blockOptions)
    };
    

    To change the accepted messages dynamically, we can store the values in eg a ConcurrentDictionary. A predicate may be trying to check a message at the same time a consumer modifies the permitted values:

    ConcurrentDictionary[] _allowedCategories=new[] {
        new ConcurrentDictionary<int,int>(),
        new ConcurrentDictionary<int,int>(),
        new ConcurrentDictionary<int,int>(),
    };
    
    async Task Consume(Message message,ConcurrentDictinary<int,int> categories)
    {
        if (message.Data.Contains("test"))
        {
            categories.TryRemove(message.CategoryId);
        }
        ...
    }
    

    And the "consumers" change to

    var consumers=new[]{
         new ActionBlock(msg=>Consume(msg,categories[0])),
         new ActionBlock(msg=>Consume(msg,categories[1])),
         new ActionBlock(msg=>Consume(msg,categories[2]))
    };
    

    It's better to create a separate method for the link predicate:

    bool IsAllowed(Message msg,ConcurrentDictionary<int,int> categories)
    {
        return categories.ContainsKey(msg.CategoryId);
    }
    
    var link1=publisher.LinkTo(consumers[0],options, msg=>IsAllowed(msg,categories[0]));
    var link2=publisher.LinkTo(consumers[1],options, msg=>IsAllowed(msg,categories[1]));
    

    One could create all these with LINQ and `Enumerable.Range. Whether that's a good idea is another matter :

    var n=10;
    var consumers=( from i in Enumerable.Range(0,n)
                             let categories=new ConcurrentDictinoary<int,int>()
                             select new { 
                                 Block=new ActionBlock(msg=>Consume(msg,categories)
                                                            ,blockOptions),
                                 Categories=categories
                             }).ToArray();
    
    foreach(var pair in consumers)
    {
        publisher.LinkTo(pair.Block,linkOption,msg=>IsAllowed(msg,pair.Category));
    }
    

    No matter how the mesh is built, publishing to it is the same. Use SendAsync on the head block

    for(int i=0;i<1000;i++)
    {
        var msg=new Message(...);
        await publisher.SendAsync(msg);
    }