Search code examples
c#.netasync-awaittask-queuecancellation

Pausable & resumable async task queue


I've implemented a work queue based on something I found here > Task queue for wp8? ...but am having trouble implementing additional functionality with it.

I'd taken out the Func<Task>'s and replaced them with ICommands (holding their own CancellationTokens), and intended to add Pause(), Resume(), Save() & Restore() methods. This is so OnFormClose() I can pause the queue processing and prompt the user to decide whether he wants to "wait for the queue to finish" (i.e. resume), or "exit now" (i.e. save and exit).

public class WqController
{
    private readonly Queue<ICommand> _queue = new Queue<ICommand>();
    private Task _queueProcessor;
    private ICommand _curCommand;

    public void Enqueue(ICommand command)
    {
        _queue.Enqueue(command);

        if (_queueProcessor == null) _queueProcessor = ProcessQueue();
    }

    private async Task ProcessQueue()
    {
        try
        {
            while (_queue.Count != 0)
            {
                _curCommand = _queue.Peek();

                try
                {
                    await Task.Run(() => _curCommand.Execute());
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("QUEUE PAUSED");
                    return;
                }
                catch (Exception)
                {
                    Console.WriteLine("FAILED TO EXECUTE COMMAND");
                }
                _queue.Dequeue();
            }
        }
        finally
        {
            _queueProcessor = null;
            _curCommand = null;
        }
    }

    public async Task Cancel()
    {
        _curCommand.Cts.Cancel(true);
        await _queueProcessor;
    }

    public void Resume()
    {
        _queueProcessor = ProcessQueue();
    }
}

The Save() & Restore() work fine, so I haven't included them here. The Cancel() works intermittently / unreliably, and the Restore() doesn't seem to work at all (confusingly to me, as I'm basically attempting just the same restart as works in the Enqueue() method).


Solution

  • I got this working and thought I should outline my solution here.

    It turns out my use of cancellation tokens was a little haphazard, which was preventing this class from functioning as intended. For example, the following issues were relevant:

    1. If Cancel was called after the last cancellation check had passed in the command, a new command would be loaded (along with its own new cancellation token), and as such the cancel call would have been lost / ignored. This was solved with if (_curCommand.Cts.Token.IsCancellationRequested) return; right after _queue.Dequeue();.

    2. After a cancel had been called, if the command were to be resumed later, then it would need a new cancellation token (otherwise the existing one with cancel = true would still be active). The line _curCommand.InvalidateCancellationToken(); does this by setting the token to null, and then my command refreshes the token when next called.

    The full code I used:

    public class WqController
    {
        private readonly Queue<ICommand> _queue = new Queue<ICommand>();
        private Task _queueProcessor;
        private ICommand _curCommand;
    
        public void Enqueue(ICommand command)
        {
            _queue.Enqueue(command);
    
            if (_queueProcessor == null) _queueProcessor = ProcessQueue();
        }
    
        private async Task ProcessQueue()
        {
            try
            {
                while (_queue.Count != 0)
                {
                    _curCommand = _queue.Peek();
    
                    try
                    {
                        await Task.Run(() => _curCommand.Execute());
                    }
                    catch (OperationCanceledException)
                    {
                        _curCommand.InvalidateCancellationToken();
                        Console.WriteLine("QUEUE PAUSED");
                        return;
                    }
                    catch (Exception)
                    {
                        Console.WriteLine("FAILED TO EXECUTE COMMAND");
                    }
                    _queue.Dequeue();
                    if (_curCommand.Cts.Token.IsCancellationRequested) return;
                }
            }
            finally
            {
                _queueProcessor = null;
                _curCommand = null;
            }
        }
    
        public async Task Cancel()
        {
            _curCommand.Cts.Cancel(true);
            await _queueProcessor;
        }
    
        public void Resume()
        {
            _queueProcessor = ProcessQueue();
        }
    }
    

    This all seems to work very smoothly now, and is a big improvement on the Background worker queue implementation I had been using prior.