Search code examples
c#blockingcollection.net-6.0worker-service

BlockingCollection<T> in a BackgroundService causes high CPU usage


I have a .NET BackgroundService for managing notifications by using a BlockingCollection<Notification>.

My implementation is cause high CPU usage, even though there is not that much work to be handled by the BlockingCollection.

I've collected some dumps and it seems that I am running into thread pool starvation.

I am not sure how this should be refactor to avoid this situation.

private readonly BlockingCollection<Notification> _notifications;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Task.Run(async () =>
        {
            await _notificationsContext.Database.MigrateAsync(stoppingToken);

            while (!stoppingToken.IsCancellationRequested)
            {

                foreach (var notification in _notifications.GetConsumingEnumerable(stoppingToken))
                {
                   // process notification
                }


            }
        }, stoppingToken);
    }

I've also tried to remove the while loop but the issue still persists.

core dump screenshot

EDIT: Added the producer

 public abstract class CommandHandlerBase
    {
        private readonly BlockingCollection<Notification> _notifications;

        public CommandHandlerBase(BlockingCollection<Notification> notifications)
        {
            _notifications = notifications;
        }
        protected void EnqueueNotification(AlertImapact alertImapact,
                                           AlertUrgency alertUrgency,
                                           AlertSeverity alertServerity,
                                           string accountName,
                                           string summary,
                                           string details,
                                           bool isEnabled,
                                           Exception exception,
                                           CancellationToken cancellationToken = default)
        {

            var notification = new Notification(accountName, summary, details, DateTime.UtcNow, exception.GetType().ToString())
            {
                Imapact = alertImapact,
                Urgency = alertUrgency,
                Severity = alertServerity,
                IsSilenced = !isEnabled,
            };

            _notifications.Add(notification, cancellationToken);
        }
    }

Solution

  • Blocking is expensive but putting the thread to sleep and rescheduling is even more expensive. To avoid this .NET typically starts blocking operations with a SpinWait before actually blocking the thread. A spinwait uses a core to do nothing for a while, which causes the CPU usage you observed.

    To fix this, use an asynchronous collection like Channels.

    • A channel allows you to asynchronously post or read messages to it, preserving their order.
    • It's thread safe which means multiple readers and writers can write to it at the same time.
    • You can create a bounded channel to prevent publishers to post if a channel is full.
    • Finally, you can read all messages in a Channel through an IAsyncEnumerable, making the processing code easier.

    Avoid blocking with Channels

    In your case, the code could change to this:

    private readonly Channel<Notification> _notifications=Channel.CreateUnbounded<Notification>();
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _notificationsContext.Database.MigrateAsync(stoppingToken);
    
        await foreach(var notification in _notifications.Reader.ReadAllAsync(stoppingToken))
        {
                   // process notification
        }
    }
    

    Channels intentionally use separate interfaces for reading and writing. To Read, you use the ChannelReader class returned by Channel.Reader. To write, you use the ChannelWriter class returned by Channel.Writer. A Channel can be implicitly cast to either type, making it easy to write publisher and subscriber methods that only accept/produce a ChannelReader or ChannelWriter.

    To write to the channel you use ChannelWriter's WriteAsync method:

    await _notifications.Writer.WriteAsync(someNotification);
    

    When you're done writing and want to close the channel, you need to call Complete() on the writer:

    await _notification.Writer.Complete();
    

    The processing loop will read any remaining messages. To await until it finishes you need to await the ChannelReader.Completion task:

    await _notification.Reader.Completion;
    

    Posting from other classes

    When you work with a BackgroundService notifications will typically arrive from other classes. This means that somehow both the publisher and the service need access to the same Channel. One way to do this is to use a helper class and inject it both in the publisher and service.

    The MessageChannel<T> class does this and also handles application termination by closing the writer:

    public class MessageChannel<T>:IDisposable 
        {
            private readonly Channel<Envelope<T>> _channel;
    
            public ChannelReader<Envelope<T>> Reader => _channel;
            public ChannelWriter<Envelope<T>> Writer => _channel;
    
            public MessageChannel(IHostApplicationLifetime lifetime)
            {
                _channel = Channel.CreateBounded<Envelope<T>>(1);
                lifetime.ApplicationStopping.Register(() => Writer.TryComplete());
            }
    
            private readonly CancellationTokenSource _cts = new();
    
            public CancellationToken CancellationToken => _cts.Token;
            public void Stop()
            {
                _cts.Cancel();
            }
    
            public void Dispose()
            {
                _cts.Dispose();
            }
        }
    

    This can be injected in the background service:

    MessageChannel<Notification> _notifications;
    ChannelReader<Notification> _reader;
    
    public MyService(MessageChannel<Notification> notifications)
    {
        _notifications=notifications;
        _reader=notifications.Reader;
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _notificationsContext.Database.MigrateAsync(stoppingToken);
    
        await foreach(var notification in _reader.ReadAllAsync(stoppingToken))
        {
                   // process notification
        }
    }