Search code examples
c#.net-coretasktask-parallel-librarybackground-service

How to run multiple long-running commands simultaneously, except from commands with the same parameters?


I have a BackgroundService (IHostedService) that implements ExecuteAsync similar to the example in Implement background tasks in microservices with IHostedService and the BackgroundService class.

I want to run several tasks that execute a long running command in an external system simultaneously. I want the service to continue executing these tasks until the service is stopped - but I don't want to execute the command with the same parameters at the same time (if it's executing with item.parameter1 = 123 I want it to wait until 123 is done, then execute with 123 again). Also, it should not block and it should not leak memory. If an exception occurs I'd like to stop the offending task gracefully, log it and restart. Each execution of the command gets different parameters, so something like this:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var items = GetItems(); //GetItems returns a List<Item>

    _logger.LogDebug($"ExternalCommandService is starting.");

    stoppingToken.Register(() => 
            _logger.LogDebug($" Execute external command background task is stopping."));

    while (!stoppingToken.IsCancellationRequested)
    {
        _logger.LogDebug($"External command task is doing background work.");

        //Execute the command with values from the item
        foreach(var item in items)
        {
             ExecuteExternalCommand(item);
        }

        await Task.Delay(_settings.CheckUpdateTime, stoppingToken);
    }
    
    _logger.LogDebug($"Execute external command background task is stopping.");

}

The structure of the data is pretty simple:

public class MyData
{
    public string Foo { get; set; }
    public string Blee { get; set; }
}

I'm a complete newbie when it comes to Task development so please forgive my lack of understanding. Would it make more sense to make ExecuteExternalCommand asynchronous? I'm not certain that these tasks are being executed in parallel. What am I doing wrong? How do I accomplish the other requirements like exception handling and graceful restarting of the tasks?


Solution

  • You have data, that you want grouped by a particular value, and then ran in it's own loop repeatedly. I will give an example for one way of doing this that hopefully you can lean on to get what you need for your answer.

    Note: This example may not be useful based on how the data is mocked. If you provide the proper data structure in your question I'll update the answer to match.

    You have data.

    public struct Data
    {
        public Data(int value) => Value = value;
        public int Value { get; }
        public string Text => $"{nameof(Data)}: {Value}";
    }
    

    You want the data grouped by a particular value. (NOTE: This example may not be logical because it's grouping the data by the Value which is already unique. This is added just for example:)

    There are many ways to do this but I'll use Linq, Distinct, and the IEqualityComparer<T> for this example which compares the Data.Value.

    public class DataDistinction : IEqualityComparer<Data>
    {
        public bool Equals(Data x, Data y) => x.Value == y.Value;
        public int GetHashCode(Data obj) => obj.Value.GetHashCode();
    }
    

    Mock the Data For this example I'll mock some data.

    var dataItems = new List<Data>();
    
    for (var i = 0; i < 10; i++)
    {
        dataItems.Add(new Data(i));
    }
    

    Process the Data For this example I'll use the IEqualityComparer<T> to get each Data uniquely by Value and start the processing.

    private static void ProcessData(List<Data> dataItems)
    {
        var groupedDataItems = dataItems.Distinct(new DataDistinction());
    
        foreach (var data in groupedDataItems)
        {
            LoopData(data); //We start unique loops here.
        }
    }
    

    Loop the Unique Data

    For this example I chose to start a new Task using the Task.Factory. This would be one time using it makes sense but typically you don't need it. In this example I pass in an Action<object> where object represents the state or parameter given to the Task. I also parse the state which you will see and start a while loop. The while loop monitors the CancellationTokenSource for cancellation. I declared the CancellationTokenSource statically in the app for convenience; which you will see in the full Console App at the bottom.

    private static void LoopData(Data data)
    {
        Task.Factory.StartNew((state) =>
        {
            var taskData = (Data)state;
            while (!cancellationTokenSource.IsCancellationRequested)
            {
                Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
                Task.Delay(100).Wait();
            }
        },
        data,
        cancellationTokenSource.Token,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    }
    

    Now I put it all into a single Console App you can copy and paste and explore.

    This will take your data, break it up, and run it all in it's all Task indefinitely until the program is ended.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    namespace Question_Answer_Console_App
    {
        class Program
        {
            private static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            static void Main(string[] args)
            {
                var dataItems = new List<Data>();
    
                for (var i = 0; i < 10; i++)
                {
                    dataItems.Add(new Data(i));
                }   
    
                ProcessData(dataItems);
                Console.ReadKey();
                cancellationTokenSource.Cancel();
                Console.WriteLine("CANCELLING...");
                Console.ReadKey();
            }
    
            private static void ProcessData(List<Data> dataItems)
            {
                var groupedDataItems = dataItems.Distinct(new DataDistinction());
    
                foreach (var data in groupedDataItems)
                {
                    LoopData(data);
                }
            }
    
            private static void LoopData(Data data)
            {
                Task.Factory.StartNew((state) =>
                {
                    var taskData = (Data)state;
                    while (!cancellationTokenSource.IsCancellationRequested)
                    {
                        Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
                        Task.Delay(100).Wait();
                    }
                },
                data,
                cancellationTokenSource.Token,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default);
            }
    
            ~Program() => cancellationTokenSource?.Dispose();
        }
    
        public struct Data
        {
            public Data(int value) => Value = value;
            public int Value { get; }
            public string Text => $"{nameof(Data)}: {Value}";
        }
    
        public class DataDistinction : IEqualityComparer<Data>
        {
            public bool Equals(Data x, Data y) => x.Value == y.Value;
            public int GetHashCode(Data obj) => obj.Value.GetHashCode();
        }
    }
    //OUTPUT UNTIL CANCELLED
    //Value: 0        Text: Data: 0
    //Value: 3        Text: Data: 3
    //Value: 1        Text: Data: 1
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 5        Text: Data: 5
    //Value: 6        Text: Data: 6
    //Value: 7        Text: Data: 7
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 5        Text: Data: 5
    //Value: 1        Text: Data: 1
    //Value: 7        Text: Data: 7
    //Value: 4        Text: Data: 4
    //Value: 0        Text: Data: 0
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 6        Text: Data: 6
    //Value: 5        Text: Data: 5
    //Value: 3        Text: Data: 3
    //Value: 8        Text: Data: 8
    //Value: 4        Text: Data: 4
    //Value: 1        Text: Data: 1
    //Value: 2        Text: Data: 2
    //Value: 9        Text: Data: 9
    //Value: 7        Text: Data: 7
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 8        Text: Data: 8
    //Value: 5        Text: Data: 5
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 1        Text: Data: 1
    //Value: 4        Text: Data: 4
    //Value: 9        Text: Data: 9
    //Value: 7        Text: Data: 7
    //Value: 0        Text: Data: 0
    //Value: 6        Text: Data: 6
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 5        Text: Data: 5
    //Value: 8        Text: Data: 8
    //Value: 7        Text: Data: 7
    //Value: 9        Text: Data: 9
    //Value: 1        Text: Data: 1
    //Value: 4        Text: Data: 4
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 5        Text: Data: 5
    //Value: 7        Text: Data: 7
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 4        Text: Data: 4
    //Value: 1        Text: Data: 1
    //Value: 0        Text: Data: 0
    //Value: 6        Text: Data: 6
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 1        Text: Data: 1
    //Value: 9        Text: Data: 9
    //Value: 5        Text: Data: 5
    //Value: 8        Text: Data: 8
    //Value: 7        Text: Data: 7
    //Value: 4        Text: Data: 4
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 4        Text: Data: 4
    //Value: 9        Text: Data: 9
    //Value: 1        Text: Data: 1
    //Value: 7        Text: Data: 7
    //Value: 8        Text: Data: 8
    //Value: 5        Text: Data: 5
    //Value: 0        Text: Data: 0
    //Value: 6        Text: Data: 6
    //Value: 4        Text: Data: 4
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 5        Text: Data: 5
    //Value: 7        Text: Data: 7
    //Value: 9        Text: Data: 9
    //Value: 8        Text: Data: 8
    //Value: 1        Text: Data: 1
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 3        Text: Data: 3
    //Value: 5        Text: Data: 5
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 7        Text: Data: 7
    //Value: 1        Text: Data: 1
    //Value: 0        Text: Data: 0
    //Value: 6        Text: Data: 6
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 3        Text: Data: 3
    //Value: 1        Text: Data: 1
    //Value: 7        Text: Data: 7
    //Value: 5        Text: Data: 5
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    // CANCELLING...
    

    In the comments you asked how to write to console when the Task is complete. You can await the Task in this example and when the Task is complete you can print that Data for show. async void isn't recommended for good reason but this is one time when it's used correctly.

    Here's the updated LoopData with the async await signature.

    private static async void LoopData(Data data)
    {
        await Task.Factory.StartNew((state) =>
        {
            var taskData = (Data)state;
            while (!cancellationTokenSource.IsCancellationRequested)
            {
                Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
                Task.Delay(100).Wait();
            }
        },
        data,
        cancellationTokenSource.Token,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    
        Console.WriteLine($"Task Complete: {data.Value} : {data.Text}");
    }
    
    //OUTPUT
    //Value: 0        Text: Data: 0
    //Value: 1        Text: Data: 1
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 5        Text: Data: 5
    //Value: 6        Text: Data: 6
    //Value: 7        Text: Data: 7
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 1        Text: Data: 1
    //Value: 5        Text: Data: 5
    //Value: 4        Text: Data: 4
    //Value: 7        Text: Data: 7
    //Value: 9        Text: Data: 9
    //Value: 8        Text: Data: 8
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 5        Text: Data: 5
    //Value: 1        Text: Data: 1
    //Value: 6        Text: Data: 6
    //Value: 9        Text: Data: 9
    //Value: 8        Text: Data: 8
    //Value: 7        Text: Data: 7
    //Value: 0        Text: Data: 0
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 1        Text: Data: 1
    //Value: 4        Text: Data: 4
    //Value: 5        Text: Data: 5
    //Value: 9        Text: Data: 9
    //Value: 6        Text: Data: 6
    //Value: 7        Text: Data: 7
    //Value: 8        Text: Data: 8
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 1        Text: Data: 1
    //Value: 5        Text: Data: 5
    //Value: 4        Text: Data: 4
    //Value: 8        Text: Data: 8
    //Value: 7        Text: Data: 7
    //Value: 9        Text: Data: 9
    //Value: 6        Text: Data: 6
    //Value: 0        Text: Data: 0
    //Value: 3        Text: Data: 3
    //Value: 2        Text: Data: 2
    //Value: 4        Text: Data: 4
    //Value: 1        Text: Data: 1
    //Value: 5        Text: Data: 5
    //Value: 8        Text: Data: 8
    //Value: 9        Text: Data: 9
    //Value: 6        Text: Data: 6
    //Value: 7        Text: Data: 7
    //Value: 0        Text: Data: 0
    //Value: 2        Text: Data: 2
    //Value: 3        Text: Data: 3
    //Value: 5        Text: Data: 5
    //Value: 4        Text: Data: 4
    //Value: 1        Text: Data: 1
    //Value: 8        Text: Data: 8
    //Value: 7        Text: Data: 7
    //Value: 6        Text: Data: 6
    //Value: 9        Text: Data: 9
    // CANCELLING...
    //Task Complete: 0 : Data: 0
    //Task Complete: 2 : Data: 2
    //Task Complete: 3 : Data: 3
    //Task Complete: 1 : Data: 1
    //Task Complete: 5 : Data: 5
    //Task Complete: 4 : Data: 4
    //Task Complete: 8 : Data: 8
    //Task Complete: 6 : Data: 6
    //Task Complete: 7 : Data: 7
    //Task Complete: 9 : Data: 9..