Search code examples
c#async-awaitproducer-consumer

Remove cancelled Task from producer/consumer queue


I want to use an async producer/consumer queue (AsyncEx lib) to send messages one at a time over a bus. Right now I achieve this simply by async blocking. It's working fine, but I have no control over the queue :(

So I came up with following solution, problem is that a canceled task is not removed from the queue. If I limit the queue to say 10 (because each message takes 1s to send and max queue time shall be 10s or so) and the queue contains already 8 waiting tasks and 2 canceled tasks, than the next queued task would throw an InvalidOperationException although the two canceled task wouldn't be sent anyway.

Maybe there is a better way to do this :D

    class Program
{
    static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
        new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();

    static void Main()
    {
        StartAsync().Wait();
    }

    static async Task StartAsync()
    {
        var sendingTask = StartSendingAsync();
        var tasks = new List<Task>();

        using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
        {
            for (var i = 0; i < 10; i++)
            {
                tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
            }

            try
            {
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages sent.");
            }
            catch (TaskCanceledException)
            {
                Console.WriteLine("At least one task was canceled.");
            }                
        }

        s_Queue.CompleteAdding();
        await sendingTask;
        s_Queue.Dispose();
        Console.WriteLine("Queue completed.");

        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    static async Task EnqueueMessageAsync(string message, CancellationToken token)
    {

        var tcs = new TaskCompletionSource();
        using (token.Register(() => tcs.TrySetCanceled()))
        {
            await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
            Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
            await tcs.Task;
        }
    }

    static async Task SendMessageAsync(string message)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
    }

    static async Task StartSendingAsync()
    {
        while (await s_Queue.OutputAvailableAsync())
        {
            var t = await s_Queue.DequeueAsync();
            if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;

            await SendMessageAsync(t.Item1);
            t.Item2.TrySetResult();
        }
    }
}

Edit 1:

As svik pointed out the InvalidOperationException is only thrown if the queue is already completed. So this solution doesn't even solve my initial problem of an unmanaged "queue" of waiting tasks. If there are e.g. more than 10 calls/10s I got a full queue and an additional unmanaged "queue" of waiting tasks like with my async blocking approach (AsyncMonitor). I guess I have to come up with some other solution then...

Edit 2:

I have N different producers of messages (I don't know how many there are because it's not my code) and only one consumer that sends the messages over a bus and checks if they were sent correctly (not really string messages).

The following code simulates a situation where the code should break (queue size is 10):

  1. Enqueue 10 messages (with an timeout of 5sec)
  2. Wait 5sec (message 0-4 were sent and message 5-9 were cancelled)
  3. Enqueue 11 new messages (w/o timeout)
  4. Message 10 - 19 should be enqueued because the queue only contains cancelled messages
  5. Message 20 should throw an exception (e.g. QueueOverflowException) because the queue is full, this would be handled or not by the producer code

Producers:

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
    for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
    await Task.Delay(TimeSpan.FromSeconds(5));
    for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }

    try
    {
        await Task.WhenAll(tasks);
        Console.WriteLine("All messages sent.");
    }
    catch (TaskCanceledException)
    {
        Console.WriteLine("At least one task was canceled.");
        Console.WriteLine("Press any key to complete queue...");
        Console.ReadKey();
    }
}

The goal is, I want to have full control over all messages that should be send, but this is not the case in the code I've posted before, because I only have control over the messages in the queue but not the messages that are waiting to be enqueued (there could be 10000 messages asynchronously waiting to be enqueued and I wouldn't know => producer code wouldn't work as expected anyway because it would take forever to send all the messages that are waiting...)

I hope this makes it clearer what I want to achieve ;)


Solution

  • I'm not sure if answering my own questing is OK, so I won't flag it as answer, maybe someone comes up with a better solution :P

    First of all here is the producer code:

    static async Task StartAsync()
    {
        using (var queue = new SendMessageQueue(10, new SendMessageService()))
        using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(4.5)))
        {
            var tasks = new List<Task>();
    
            for (var i = 0; i < 10; i++)
            {
                tasks.Add(queue.SendAsync(i.ToString(), timeoutTokenSource.Token));
            }
            await Task.Delay(TimeSpan.FromSeconds(4.5));
            for (var i = 10; i < 25; i++)
            {
                tasks.Add(queue.SendAsync(i.ToString(), default(CancellationToken)));
            }
    
            await queue.CompleteSendingAsync();
    
            for (var i = 0; i < tasks.Count; i++ )
            {
                try
                {
                    await tasks[i];
                    Console.WriteLine("Message '{0}' send.", i);
                }
                catch (TaskCanceledException)
                {
                    Console.WriteLine("Message '{0}' canceled.", i);
                }
                catch (QueueOverflowException ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }
    
    • 25 messages are enqueued over 5sec
    • 16 messages are sent
    • 3 messages are not sent (queue is full)
    • 6 messages get canceled

    And here is the "Queue" class that is based upon a List. It's a combination of the queue and the consumer. Synchronisation is done with the AsyncMonitor class (AsyncEx by Stephen Cleary).

    class SendMessageQueue : IDisposable
    {
        private bool m_Disposed;
        private bool m_CompleteSending;
        private Task m_SendingTask;
        private AsyncMonitor m_Monitor;
        private List<MessageTaskCompletionSource> m_MessageCollection;
        private ISendMessageService m_SendMessageService;
    
        public int Capacity { get; private set; }
    
    
        public SendMessageQueue(int capacity, ISendMessageService service)
        {
            Capacity = capacity;
            m_Monitor = new AsyncMonitor();
            m_MessageCollection = new List<MessageTaskCompletionSource>();
            m_SendMessageService = service;
            m_SendingTask = StartSendingAsync();
        }
    
        public async Task<bool> SendAsync(string message, CancellationToken token)
        {
            if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
            if (message == null) { throw new ArgumentNullException("message"); }
    
            using (var messageTcs = new MessageTaskCompletionSource(message, token))
            {
                await AddAsync(messageTcs);
                return await messageTcs.Task;
            }
        }
    
        public async Task CompleteSendingAsync()
        {
            if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
    
            using (m_Monitor.Enter())
            {
                m_CompleteSending = true;
            }
            await m_SendingTask;
        }
    
        private async Task AddAsync(MessageTaskCompletionSource message)
        {
            using (await m_Monitor.EnterAsync(message.Token))
            {
                if (m_CompleteSending) { throw new InvalidOperationException("Queue already completed."); }
                if (Capacity < m_MessageCollection.Count)
                {
                    m_MessageCollection.RemoveAll(item => item.IsCanceled);
                    if (Capacity < m_MessageCollection.Count)
                    {
                        throw new QueueOverflowException(string.Format("Queue overflow; '{0}' couldn't be enqueued.", message.Message));
                    }
                }
                m_MessageCollection.Add(message);
            }
            m_Monitor.Pulse(); // signal new message
            Console.WriteLine("Thread '{0}' - {1}: '{2}' enqueued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message.Message);
        }
    
        private async Task<MessageTaskCompletionSource> TakeAsync()
        {
            using (await m_Monitor.EnterAsync())
            {
                var message = m_MessageCollection.ElementAt(0);
                m_MessageCollection.RemoveAt(0);
                return message;
            }
        }
    
        private async Task<bool> OutputAvailableAsync()
        {
            using (await m_Monitor.EnterAsync())
            {
                if (m_MessageCollection.Count > 0) { return true; }
                else if (m_CompleteSending) { return false; }
    
                await m_Monitor.WaitAsync();
                return true;
            }
        }
    
        private async Task StartSendingAsync()
        {
            while (await OutputAvailableAsync())
            {
                var message = await TakeAsync();
                if (message.IsCanceled) continue;
                try
                {
                    var result = await m_SendMessageService.SendMessageAsync(message.Message, message.Token);
                    message.TrySetResult(result);
                }
                catch (TaskCanceledException) { message.TrySetCanceled(); }
            }
        }
    
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    
        protected void Dispose(bool disposing)
        {
            if (m_Disposed) return;
            if (disposing)
            {
                if (m_MessageCollection != null)
                {
                    var tmp = m_MessageCollection;
                    m_MessageCollection = null;
                    tmp.ForEach(item => item.Dispose());
                    tmp.Clear();
                }
            }
            m_Disposed = true;
        }
    
        #region MessageTaskCompletionSource Class
    
        class MessageTaskCompletionSource : TaskCompletionSource<bool>, IDisposable
        {
            private bool m_Disposed;
            private IDisposable m_CancellationTokenRegistration;
    
            public string Message { get; private set; }
            public CancellationToken Token { get; private set; }
            public bool IsCompleted { get { return Task.IsCompleted; } }
            public bool IsCanceled { get { return Task.IsCanceled; } }
            public bool IsFaulted { get { return Task.IsFaulted; } }
    
    
            public MessageTaskCompletionSource(string message, CancellationToken token)
            {
                m_CancellationTokenRegistration = token.Register(() => TrySetCanceled());
                Message = message;
                Token = token;
            }
    
            public void Dispose()
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
    
            protected void Dispose(bool disposing)
            {
                if (m_Disposed) return;
                if (disposing)
                {
                    TrySetException(new ObjectDisposedException(GetType().Name));
    
                    if (m_CancellationTokenRegistration != null)
                    {
                        var tmp = m_CancellationTokenRegistration;
                        m_CancellationTokenRegistration = null;
                        tmp.Dispose();
                    }
                }
                m_Disposed = true;
            }
        }
    
        #endregion
    }
    

    For now I'am OK with this solution; it's gets the job done :D