Search code examples
c#asp.netparallel-processingtask-parallel-librarycancellation-token

C# Cancel a list of tasks which are executing long running sql queries


I need to cancel a list of tasks that are running SQL queries after the wait time expires. I can implement CancellationToken to cancel the tasks. But the Cancellation is cooperative, so it means I have to check the cancel token status inside my action before every step. But in my case the sql queries are the ones that take long time, and I can only check the cancel token status before or after the query execution. In the later case it is useless, so how do I cancel the query execution inside these tasks based on the cancel token status?

public void EnqueueTask(Action action, CancellationToken cancelToken = default(CancellationToken))
{
    var task = new Task(action, cancelToken, TaskCreationOptions.LongRunning);
    if (_workTaskQueue.TryAdd(task))
    {
        TaskHandler?.Invoke
            (new TaskProcessingArguments
            {
                ISTaskAdded = true,
                Message = "Task Added to Queue",
                PendingTaskCount = _workTaskQueue.Count,
            });
    }
    else
    {
        TaskHandler?.Invoke
            (new TaskProcessingArguments
            {
                ISTaskAdded = false,
                Message = "Timedout while adding Task to Queue",
                PendingTaskCount = _workTaskQueue.Count,
            });
    }
}

public void DequeueTask(int maxConcurrency, CancellationToken ct)
{
    var tasks = new List<Task>();
    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        foreach (var task in _workTaskQueue.GetConsumingEnumerable())
        {
            try
            {
                if (!(task.IsCanceled) && task.Status == TaskStatus.Created)
                {
                    tasks.Add(task);
                    task.Start();
                }
            }
            finally {
                concurrencySemaphore.Release();
            }
        }
    }
    Task.WaitAll(tasks.ToArray());
}

    void StartWorker()
    {
        Task.Factory.StartNew(() =>
        {
            try
            {
                taskQueue.DequeueTask(maxConcurrency, cancellationToken);
            }
            finally {
                lock (syncObj)
                {
                    IsCompleted = true;
                }
                //Logger.Info("Closing Worker task!!!");
            }
        }, TaskCreationOptions.LongRunning);
    }

Solution

  • You need to use CancellationToken inside of functions passed as Action instances to EnqueueTask method.

    E.g. the following code shows how CancellationToken can be used for terminating execution of SQL commands:

        using (SqlConnection conn = new SqlConnection(sqlConnection))
        {
            conn.Open();
            var cmd = conn.CreateCommand();
    
            using (cancellationToken.Register(() => cmd.Cancel()))
            {
                cmd.CommandText = sql;
                cmd.ExecuteNonQuery();
            }
        }