Search code examples
c#task-parallel-librarytpl-dataflowcancellationtokensourcecancellation-token

Cancelling specific task in a dataflow ActionBlock


After spending all day searching I came across a comment that TPL Dataflow cancellation comes out of the box as a way to cancel an entire block, and not an individual task/item. My main stumbling block is how do I go about cancelling Task #5 if my Action block is concurrently running 10 tasks in parallel? There are quite a few cancellation examples out there, but they all use just one cancellation token, and end up cancelling Task #5 and all the tasks after it.

Here is some very stripped down code. I am guessing I have to pass something along with "i", but unclear how to programatically create unique cancellation tokens, coupled with the fact I may need anywhere from 1 to 10 of them in this particular example. Even if I create a cancellation token in the mouse-click method, do I then have to check it against existing tokens to make sure it is unique? So confused.

            // Define the action block
        ActionBlock<int> throttle = new ActionBlock<int>(
            action: i=>DoStuff(i),
            dataflowBlockOptions: new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 10,
                CancellationToken = cts.Token
            });

        // Create the work set (pretend it is a mouse-click method... it will not be a for loop)
        for (int i = 0; i < work.Length; i++)
        {
            Console.WriteLine($"{i:D3} : Posting Work Item {i}.");
            throttle.Post(i);
        }

Solution

  • If you want to be able to stop them, you need a list of cancellation token sources so that you can signal them individually. If you want one per element in work you could just select over it:

    var sources = work.Select( w => new CancellationTokenSource()).ToList();
    

    Then, you have to modify DoStuff to check the token:

    void DoStuff(int i, CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            //Do the rest of the stuff
        }
    }
    

    And you need to pass it when you call it:

    ActionBlock<int> throttle = new ActionBlock<int>(
        action: i=>DoStuff(i, sources[i].Token),  //<--- modified
        dataflowBlockOptions: new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 10,
            CancellationToken = cts.Token
        });
    

    Then to cancel an individual task, call:

    sources[i].Cancel();