Search code examples
asp.net-corebackground-taskasp.net-core-hosted-services

Allow user to cancel task running in BackgroundService


I created a background service to process items in a queue as demonstrated here:

Queued Background Task

Unlike the example, items are added to the queue via a Web API method that looks something like this:

[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
    var collection = GetALargeCollection();
    await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
        foreach (var item in collection) {
           //do work here
        }
    });
    return Ok();
}

I would like to provide a way for a caller to cancel a queued task in a subsequent request using a reference returned by QueueWorkItem. I tried the following, but CancellationTokenSource is not serializable. Is there some way to accomplish this?

[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
    var collection = GetALargeCollection();
    var source = new CancellationTokenSource();
    var cancellationToken = source.Token;

    await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
       foreach (var item in collection) {
           cancellationToken.ThrowIfCancellationRequested();
           //do work here
       }
    });
    return Ok(source);
}

[HttpPut("cancelworkitem")]
public Task<ActionResult> CancelWorkItem(CancellationTokenSource source) {
    source.Cancel();
    return Ok();
}


Solution

  • You can keep your cancellation token inside your task queue and use a unique key to find it later, for example:

    create a WorkItem class/record to keep everything in one place:

    public record WorkItem(Func<CancellationToken, ValueTask> func, string key, CancellationTokenSource tokenSource);
    

    then you can store this key/token inside a dictionary: for example:

    public class BackgroundTaskQueue : IBackgroundTaskQueue
    {
        private readonly ConcurrentDictionary<string, CancellationTokenSource> _tokenSources;
        private readonly Channel<WorkItem> _queue;
    
        public BackgroundTaskQueue(int capacity = 100)
        {
            var options = new BoundedChannelOptions(capacity)
            {
                FullMode = BoundedChannelFullMode.Wait
            };
            _queue = Channel.CreateBounded<WorkItem>(options);
            _tokenSources = new ConcurrentDictionary<string, CancellationTokenSource>();
        }
    
        public bool CancelWorkItem(string key)
        {
            var exist = _tokenSources.TryRemove(key, out var tokenSource);
            if (!exist) return false; // token source not found
    
            tokenSource!.Cancel();
            return true;
        }
    
        public async ValueTask QueueBackgroundWorkItemAsync(WorkItem workItem)
        {
            _tokenSources.TryAdd(workItem.Key, workItem.TokenSource);
            await _queue.Writer.WriteAsync(workItem);
        }
    
        public async ValueTask<WorkItem> DequeueAsync(CancellationToken cancellationToken)
        {
            var workItem = await _queue.Reader.ReadAsync(cancellationToken);
            return workItem;
        }
    }
    

    At the end, you can use it this way in your API:

    [HttpGet("queueworkitem")]
        public async Task<ActionResult> QueueWorkItem()
        {
            var key = Guid.NewGuid().ToString("N");
            var tokenSource = new CancellationTokenSource();
            var cancellationToken = tokenSource.Token; 
            
            await _taskQueue.QueueBackgroundWorkItemAsync(new WorkItem(async (token) =>
            {
                while (!cancellationToken.IsCancellationRequested && !token.IsCancellationRequested)
                {
                    Console.WriteLine($"Processing ... {key}");
                    await Task.Delay(1000, cancellationToken);
                } 
            }, key, tokenSource));
            
            return Ok(key);
        }
        
        
        [HttpGet("cancelworkitem")]
        public ActionResult CancelWorkItem(string token_key)
        {
            if (_taskQueue.CancelWorkItem(token_key))
            {
                return Ok();
            }
    
            return BadRequest("Task not found");
        }
    

    This will work fine if you have only 1 server and you don't want to serialize and save your tasks in some storage. but if you need to run the tasks later, I would suggest checking out the Hangfire library which gives you exactly the same functionalities.