I created a background service to process items in a queue as demonstrated here:
Unlike the example, items are added to the queue via a Web API method that looks something like this:
[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
var collection = GetALargeCollection();
await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
foreach (var item in collection) {
//do work here
}
});
return Ok();
}
I would like to provide a way for a caller to cancel a queued task in a subsequent request using a reference returned by QueueWorkItem. I tried the following, but CancellationTokenSource is not serializable. Is there some way to accomplish this?
[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
var collection = GetALargeCollection();
var source = new CancellationTokenSource();
var cancellationToken = source.Token;
await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
foreach (var item in collection) {
cancellationToken.ThrowIfCancellationRequested();
//do work here
}
});
return Ok(source);
}
[HttpPut("cancelworkitem")]
public Task<ActionResult> CancelWorkItem(CancellationTokenSource source) {
source.Cancel();
return Ok();
}
You can keep your cancellation token inside your task queue and use a unique key to find it later, for example:
create a WorkItem
class/record to keep everything in one place:
public record WorkItem(Func<CancellationToken, ValueTask> func, string key, CancellationTokenSource tokenSource);
then you can store this key/token inside a dictionary: for example:
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly ConcurrentDictionary<string, CancellationTokenSource> _tokenSources;
private readonly Channel<WorkItem> _queue;
public BackgroundTaskQueue(int capacity = 100)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<WorkItem>(options);
_tokenSources = new ConcurrentDictionary<string, CancellationTokenSource>();
}
public bool CancelWorkItem(string key)
{
var exist = _tokenSources.TryRemove(key, out var tokenSource);
if (!exist) return false; // token source not found
tokenSource!.Cancel();
return true;
}
public async ValueTask QueueBackgroundWorkItemAsync(WorkItem workItem)
{
_tokenSources.TryAdd(workItem.Key, workItem.TokenSource);
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<WorkItem> DequeueAsync(CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
At the end, you can use it this way in your API:
[HttpGet("queueworkitem")]
public async Task<ActionResult> QueueWorkItem()
{
var key = Guid.NewGuid().ToString("N");
var tokenSource = new CancellationTokenSource();
var cancellationToken = tokenSource.Token;
await _taskQueue.QueueBackgroundWorkItemAsync(new WorkItem(async (token) =>
{
while (!cancellationToken.IsCancellationRequested && !token.IsCancellationRequested)
{
Console.WriteLine($"Processing ... {key}");
await Task.Delay(1000, cancellationToken);
}
}, key, tokenSource));
return Ok(key);
}
[HttpGet("cancelworkitem")]
public ActionResult CancelWorkItem(string token_key)
{
if (_taskQueue.CancelWorkItem(token_key))
{
return Ok();
}
return BadRequest("Task not found");
}
This will work fine if you have only 1 server and you don't want to serialize and save your tasks in some storage. but if you need to run the tasks later, I would suggest checking out the Hangfire library which gives you exactly the same functionalities.