Search code examples
c#task-parallel-librarytpl-dataflowevent-drivensystem.threading.channels

Contentious paralleled work between two collections of objects


I am trying to simulate work between two collections asynchronously and in parallel, I have a ConcurrentQueue of customers and a collection of workers. I need the workers to take a Customer from the queue perform work on the customer and once done take another customer right away.

I decided I'd use an event-based paradigm where the collection of workers would perform an action on a customer; who holds an event handler that fires off when the customer is done; which would hopefully fire off the DoWork Method once again, that way I can parallelize the workers to take customers from the queue. But I can't figure out how to pass a customer into DoWork in OnCustomerFinished()! The worker shouldn't depend on a queue of customers obviously

public class Worker
{
    public async Task DoWork(ConcurrentQueue<Customer> cust)
    {
        await Task.Run(() =>
        {
            if (cust.TryDequeue(out Customer temp))
            {
                Task.Delay(5000);
                temp.IsDone = true;
            }
        });
    }

    public void OnCustomerFinished()
    {
        // This is where I'm stuck
        DoWork(~HOW TO PASS THE QUEUE OF CUSTOMER HERE?~);
    }
}

// Edit - This is the Customer Class

 public class Customer
{
    private bool _isDone = false;

    public EventHandler<EventArgs> CustomerFinished;

    public bool IsDone
    {
        private get { return _isDone; }
        set
        {
            _isDone = value;
            if (_isDone)
            {
                OnCustomerFinished();
            }

        }
    }
    protected virtual void OnCustomerFinished()
    {
        if (CustomerFinished != null)
        {
            CustomerFinished(this, EventArgs.Empty);
        }
    }
}

Solution

  • .NET already has pub/sub and worker mechanisms in the form of DataFlow blocks and lately, Channels.

    Dataflow

    Dataflow blocks from the System.Threading.Tasks.Dataflow namespace are the "old" way (2012 and later) of building workers and pipelines of workers. Each block has an input and/or output buffer. Each message posted to the block is processed by one or more tasks in the background. For blocks with outputs, the output of each iteration is stored in the output buffer.

    Blocks can be combined into pipelines similar to a CMD or Powershell pipeline, with each block running on its own task(s).

    In the simplest case an ActionBlock can be used as a worker:

    void ProcessCustomer(Customer customer)
    {
        ....
    }
    
    var block =new ActionBlock<Customer>(cust=>ProcessCustomer(cust));
    

    That's it. There's no need to manually dequeue or poll.

    The producer method can start sending customer instances to the block. Each of them will be processed in the background, in the order they were posted :

    foreach(var customer in bigCustomerList)
    {
        block.Post(customer);
    }
    

    When done, eg when the application terminates, the producer only needs to call Complete() on the block and wait for any remaining entries to complete.

    block.Complete();
    await block.Completion;
    

    Blocks can work with asynchronous methods too.

    Channels

    Channels are a new mechanism, built into .NET Core 3 and available as a NuGet in previous .NET Framework and .NET Core version. The producer writes to a channel using a ChannelWriter and the consumer reads from the channel using a ChannelReader. This may seem a bit strange until you realize it allows some powerful patterns.

    The producer could be something like this, eg a producer that "produces" all customers in a list with a 0.5 sec delay :

    ChannelReader<Customer> Producer(IEnumerable<Customer> customers,CancellationToken token=default)
    {
        //Create a channel that can buffer an infinite number of entries
        var channel=Channel.CreateUnbounded();
        var writer=channel.Writer;
        //Start a background task to produce the data
        _ = Task.Run(async ()=>{
            foreach(var customer in customers)
            {
                //Exit gracefully in case of cancellation
                if (token.IsCancellationRequested)
                {
                    return;
                }
                await writer.WriteAsync(customer,token);
                await Task.Delay(500);
            }
        },token)
             //Ensure we complete the writer no matter what
             .ContinueWith(t=>writer.Complete(t.Exception);
    
       return channel.Reader;
    }
    

    That's a bit more involved but notice that the only thing the function needs to return is the ChannelReader. The cancellation token is useful for terminating the producer early, eg after a timeout or if the application closes.

    When the writer completes, all the channel's readers will also complete.

    The consumer only needs that ChannelReader to work :

    async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
    {
        while(await reader.WaitToReadAsync(token))
        {
           while(reader.TryRead(out var customer))
           {
               //Process the customer
           }
        }
    }
    

    Should the writer complete, WaitToReadAsync will return false and the loop will exit.

    In .NET Core 3 the ChannelReader supports IAsyncEnumerable through the ReadAllAsync method, making the code even simpler :

    async Task Consumer(ChannelReader<Customer> reader,CancellationToken token=default)
    {
        await foreach(var customer in reader.ReadAllAsync(token))
        {
               //Process the customer
        }
    }
    

    The reader created by the producer can be passed directly to the consumer :

    var customers=new []{......}
    var reader=Producer(customers);
    await Consumer(reader);
    

    Intermediate steps can read from a previous channel reader and publish data to the next, eg an order generator :

    ChannelReader<Order> ConsumerOrders(ChannelReader<Customer> reader,CancellationToken token=default)
    {
        var channel=Channel.CreateUnbounded();
        var writer=channel.Writer;
        //Start a background task to produce the data
        _ = Task.Run(async ()=>{
            await foreach(var customer in reader.ReadAllAsync(token))
            {
               //Somehow create an order for the customer
               var order=new Order(...);
               await writer.WriteAsync(order,token);
            }
        },token)
             //Ensure we complete the writer no matter what
             .ContinueWith(t=>writer.Complete(t.Exception);
    
       return channel.Reader;
    }
    

    Again, all we need to do is pass the readers from one method to the next

    var customers=new []{......}
    var customerReader=Producer(customers);
    var orderReader=CustomerOrders(customerReader);
    await ConsumeOrders(orderReader);