Search code examples
c#producer-consumerinterlockedsystem.threading.channels

Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full


Suppose I am provided with an event producer API consisting of Start(), Pause(), and Resume() methods, and an ItemAvailable event. The producer itself is external code, and I have no control over its threading. A few items may still come through after Pause() is called (the producer is actually remote, so items may already be in flight over the network).

Suppose also that I am writing consumer code, where consumption may be slower than production.

Critical requirements are

  1. The consumer event handler must not block the producer thread, and
  2. All events must be processed (no data can be dropped).

I introduce a buffer into the consumer to smooth out some burstiness. But in the case of extended burstiness, I want to call Producer.Pause(), and then Resume() at an appropriate time, to avoid running out of memory at the consumer side.

I have a solution making use of Interlocked to increment and decrement a counter, which is compared to a threshold to decide whether it is time to Pause or Resume.

Question: Is there a better solution than the Interlocked counter (int current in the code below), in terms of efficiency (and elegance)?

Updated MVP (no longer bounces off the limiter):

namespace Experiments
{
    internal class Program
    {
        // simple external producer API for demo purposes
        private class Producer
        {
            public void Pause(int i) { _blocker.Reset(); Console.WriteLine($"paused at {i}"); }
            public void Resume(int i) { _blocker.Set(); Console.WriteLine($"resumed  at {i}"); }
            public async Task Start()
            {
                await Task.Run
                (
                    () =>
                    {
                        for (int i = 0; i < 10000; i++)
                        {
                            _blocker.Wait();
                            ItemAvailable?.Invoke(this, i);
                        }
                    }
                );
            }

            public event EventHandler<int> ItemAvailable;
            private ManualResetEventSlim _blocker = new(true);
        }

        private static async Task Main(string[] args)
        {
            var p = new Producer();
            var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true });
            int threshold = 1000;
            int resumeAt = 10;
            int current = 0;
            int paused = 0;

            p.ItemAvailable += (_, i) =>
            {
                if (Interlocked.Increment(ref current) >= threshold
                    && Interlocked.CompareExchange(ref paused, 0, 1) == 0
                ) p.Pause(i);

                buffer.Writer.TryWrite(i);
            };

            var processor = Task.Run
            (
                async () =>
                {
                    await foreach (int i in buffer.Reader.ReadAllAsync())
                    {
                        Console.WriteLine($"processing {i}");
                        await Task.Delay(10);
                        if
                        (
                            Interlocked.Decrement(ref current) < resumeAt
                            && Interlocked.CompareExchange(ref paused, 1, 0) == 1
                        ) p.Resume(i);
                    }
                }
            );

            p.Start();
            await processor;
        }
    }
}

Solution

  • If you are aiming at elegance, you could consider baking the pressure-awareness functionality inside a custom Channel<T>. Below is a PressureAwareUnboundedChannel<T> class that derives from the Channel<T>. It offers all the functionality of the base class, plus it emits notifications when the channel becomes under pressure, and when the pressure is relieved. The notifications are pushed through an IProgress<bool> instance, that emits a true value when the pressure surpasses a specific high-threshold, and a false value when the pressure drops under a specific low-threshold.

    public sealed class PressureAwareUnboundedChannel<T> : Channel<T>
    {
        private readonly Channel<T> _channel;
        private readonly int _highPressureThreshold;
        private readonly int _lowPressureThreshold;
        private readonly IProgress<bool> _pressureProgress;
        private int _pressureState = 0; // 0: no pressure, 1: under pressure
    
        public PressureAwareUnboundedChannel(int lowPressureThreshold,
            int highPressureThreshold, IProgress<bool> pressureProgress)
        {
            if (lowPressureThreshold < 0)
                throw new ArgumentOutOfRangeException(nameof(lowPressureThreshold));
            if (highPressureThreshold < lowPressureThreshold)
                throw new ArgumentOutOfRangeException(nameof(highPressureThreshold));
            if (pressureProgress == null)
                throw new ArgumentNullException(nameof(pressureProgress));
            _highPressureThreshold = highPressureThreshold;
            _lowPressureThreshold = lowPressureThreshold;
            _pressureProgress = pressureProgress;
            _channel = Channel.CreateBounded<T>(Int32.MaxValue);
            this.Writer = new ChannelWriter(this);
            this.Reader = new ChannelReader(this);
        }
    
        private class ChannelWriter : ChannelWriter<T>
        {
            private readonly PressureAwareUnboundedChannel<T> _parent;
    
            public ChannelWriter(PressureAwareUnboundedChannel<T> parent)
                => _parent = parent;
            public override bool TryComplete(Exception error = null)
                => _parent._channel.Writer.TryComplete(error);
            public override bool TryWrite(T item)
            {
                bool success = _parent._channel.Writer.TryWrite(item);
                if (success) _parent.SignalWriteOrRead();
                return success;
            }
            public override ValueTask<bool> WaitToWriteAsync(
                CancellationToken cancellationToken = default)
                    => _parent._channel.Writer.WaitToWriteAsync(cancellationToken);
        }
    
        private class ChannelReader : ChannelReader<T>
        {
            private readonly PressureAwareUnboundedChannel<T> _parent;
    
            public ChannelReader(PressureAwareUnboundedChannel<T> parent)
                => _parent = parent;
            public override Task Completion => _parent._channel.Reader.Completion;
            public override bool CanCount => _parent._channel.Reader.CanCount;
            public override int Count => _parent._channel.Reader.Count;
            public override bool TryRead(out T item)
            {
                bool success = _parent._channel.Reader.TryRead(out item);
                if (success) _parent.SignalWriteOrRead();
                return success;
            }
            public override ValueTask<bool> WaitToReadAsync(
                CancellationToken cancellationToken = default)
                    => _parent._channel.Reader.WaitToReadAsync(cancellationToken);
        }
    
        private void SignalWriteOrRead()
        {
            var currentCount = _channel.Reader.Count;
            bool underPressure;
            if (currentCount > _highPressureThreshold)
                underPressure = true;
            else if (currentCount <= _lowPressureThreshold)
                underPressure = false;
            else
                return;
            int newState = underPressure ? 1 : 0;
            int oldState = underPressure ? 0 : 1;
            if (Interlocked.CompareExchange(
                ref _pressureState, newState, oldState) != oldState) return;
            _pressureProgress.Report(underPressure);
        }
    }
    

    The encapsulated Channel<T> is actually a bounded channel, having capacity equal to the maximum Int32 value, because only bounded channels implement the Reader.Count property.¹

    Usage example:

    var progress = new Progress<bool>(underPressure =>
    {
        if (underPressure) Producer.Pause(); else Producer.Resume();
    });
    var channel = new PressureAwareUnboundedChannel<Item>(500, 1000, progress);
    

    In this example the Producer will be paused when the items stored inside the channel become more than 1000, and it will be resumed when the number of items drops to 500 or less.

    The Progress<bool> action is invoked on the context that was captured at the time of the Progress<bool>'s creation. So if you create it on the UI thread of a GUI application, the action will be invoked on the UI thread, otherwise in will be invoked on the ThreadPool. In the later case there will be no protection against overlapping invocations of the Action<bool>. If the Producer class is not thread-safe, you'll have to add synchronization inside the handler. Example:

    var progress = new Progress<bool>(underPressure =>
    {
        lock (Producer) if (underPressure) Producer.Pause(); else Producer.Resume();
    });
    

    ¹ Actually unbounded channels also support the Count property, unless they are configured with the SingleReader option.