Search code examples
c#memory-leaksasync-awaitconcurrent-queue

C# Abortable Asynchronous Fifo Queue - leaking massive amounts of memory


I need to process data from a producer in FIFO fashion with the ability to abort processing if the same producer produces a new bit of data.

So I implemented an abortable FIFO queue based on Stephen Cleary's AsyncCollection (called AsyncCollectionAbortableFifoQueuein my sample) and one on TPL's BufferBlock (BufferBlockAbortableAsyncFifoQueue in my sample). Here's the implementation based on AsyncCollection

public class AsyncCollectionAbortableFifoQueue<T> : IExecutableAsyncFifoQueue<T>
{
    private AsyncCollection<AsyncWorkItem<T>> taskQueue = new AsyncCollection<AsyncWorkItem<T>>();
    private readonly CancellationToken stopProcessingToken;

    public AsyncCollectionAbortableFifoQueue(CancellationToken cancelToken)
    {
        stopProcessingToken = cancelToken;
        _ = processQueuedItems();
    }

    public Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken)
    {
        var tcs = new TaskCompletionSource<T>();
        var item = new AsyncWorkItem<T>(tcs, action, cancelToken);
        taskQueue.Add(item);
        return tcs.Task;
    }

    protected virtual async Task processQueuedItems()
    {
        while (!stopProcessingToken.IsCancellationRequested)
        {
            try
            {
                var item = await taskQueue.TakeAsync(stopProcessingToken).ConfigureAwait(false);
                if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
                    item.TaskSource.SetCanceled();
                else
                {
                    try
                    {
                        T result = await item.Action().ConfigureAwait(false);
                        item.TaskSource.SetResult(result);   // Indicate completion
                    }
                    catch (Exception ex)
                    {
                        if (ex is OperationCanceledException && ((OperationCanceledException)ex).CancellationToken == item.CancelToken)
                            item.TaskSource.SetCanceled();
                        item.TaskSource.SetException(ex);
                    }
                }
            }
            catch (Exception) { }
        }
    }
}

public interface IExecutableAsyncFifoQueue<T>
{
    Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken);
}

processQueuedItems is the task that dequeues AsyncWorkItem's from the queue, and executes them unless cancellation has been requested.

The asynchronous action to execute gets wrapped into an AsyncWorkItem which looks like this

internal class AsyncWorkItem<T>
{
    public readonly TaskCompletionSource<T> TaskSource;
    public readonly Func<Task<T>> Action;
    public readonly CancellationToken? CancelToken;

    public AsyncWorkItem(TaskCompletionSource<T> taskSource, Func<Task<T>> action, CancellationToken? cancelToken)
    {
        TaskSource = taskSource;
        Action = action;
        CancelToken = cancelToken;
    }
}

Then there's a task looking and dequeueing items for processing and either processing them, or aborting if the CancellationToken has been triggered.

That all works just fine - data gets processed, and if a new piece of data is received, processing of the old is aborted. My problem now stems from these Queues leaking massive amounts of memory if I crank up the usage (producer producing a lot more than the consumer processes). Given it's abortable, the data that is not processed, should be discarded and eventually disappear from memory.

So let's look at how I'm using these queues. I have a 1:1 match of producer and consumer. Every consumer handles data of a single producer. Whenever I get a new data item, and it doesn't match the previous one, I catch the queue for the given producer (User.UserId) or create a new one (the 'executor' in the code snippet). Then I have a ConcurrentDictionary that holds a CancellationTokenSource per producer/consumer combo. If there's a previous CancellationTokenSource, I call Cancel on it and Dispose it 20 seconds later (immediate disposal would cause exceptions in the queue). I then enqueue processing of the new data. The queue returns me a task that I can await so I know when processing of the data is complete, and I then return the result.

Here's that in code

internal class SimpleLeakyConsumer
{
    private ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>> groupStateChangeExecutors = new ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>>();
    private readonly ConcurrentDictionary<string, CancellationTokenSource> userStateChangeAborters = new ConcurrentDictionary<string, CancellationTokenSource>();
    protected CancellationTokenSource serverShutDownSource;
    private readonly int operationDuration = 1000;

    internal SimpleLeakyConsumer(CancellationTokenSource serverShutDownSource, int operationDuration)
    {
        this.serverShutDownSource = serverShutDownSource;
        this.operationDuration = operationDuration * 1000; // convert from seconds to milliseconds
    }

    internal async Task<bool> ProcessStateChange(string userId)
    {
        var executor = groupStateChangeExecutors.GetOrAdd(userId, new AsyncCollectionAbortableFifoQueue<bool>(serverShutDownSource.Token));
        CancellationTokenSource oldSource = null;
        using (var cancelSource = userStateChangeAborters.AddOrUpdate(userId, new CancellationTokenSource(), (key, existingValue) =>
        {
            oldSource = existingValue;
            return new CancellationTokenSource();
        }))
        {
            if (oldSource != null && !oldSource.IsCancellationRequested)
            {
                oldSource.Cancel();
                _ = delayedDispose(oldSource);
            }
            try
            {
                var executionTask = executor.EnqueueTask(async () => { await Task.Delay(operationDuration, cancelSource.Token).ConfigureAwait(false); return true; }, cancelSource.Token);
                var result = await executionTask.ConfigureAwait(false);
                userStateChangeAborters.TryRemove(userId, out var aborter);
                return result;
            }
            catch (Exception e)
            {
                if (e is TaskCanceledException || e is OperationCanceledException)
                    return true;
                else
                {
                    userStateChangeAborters.TryRemove(userId, out var aborter);
                    return false;
                }
            }
        }
    }

    private async Task delayedDispose(CancellationTokenSource src)
    {
        try
        {
            await Task.Delay(20 * 1000).ConfigureAwait(false);
        }
        finally
        {
            try
            {
                src.Dispose();
            }
            catch (ObjectDisposedException) { }
        }
    }
}

In this sample implementation, all that is being done is wait, then return true.

To test this mechanism, I wrote the following Data producer class:

internal class SimpleProducer
{

    //variables defining the test
    readonly int nbOfusers = 10;
    readonly int minimumDelayBetweenTest = 1; // seconds
    readonly int maximumDelayBetweenTests = 6; // seconds
    readonly int operationDuration = 3; // number of seconds an operation takes in the tester

    private readonly Random rand;
    private List<User> users;
    private readonly SimpleLeakyConsumer consumer;

    protected CancellationTokenSource serverShutDownSource, testAbortSource;
    private CancellationToken internalToken = CancellationToken.None;

    internal SimpleProducer()
    {
        rand = new Random();
        testAbortSource = new CancellationTokenSource();
        serverShutDownSource = new CancellationTokenSource();
        generateTestObjects(nbOfusers, 0, false);
        consumer = new SimpleLeakyConsumer(serverShutDownSource, operationDuration);
    }

    internal void StartTests()
    {
        if (internalToken == CancellationToken.None || internalToken.IsCancellationRequested)
        {
            internalToken = testAbortSource.Token;
            foreach (var user in users)
                _ = setNewUserPresence(internalToken, user);
        }
    }

    internal void StopTests()
    {
        testAbortSource.Cancel();
        try
        {
            testAbortSource.Dispose();
        }
        catch (ObjectDisposedException) { }
        testAbortSource = new CancellationTokenSource();
    }

    internal void Shutdown()
    {
        serverShutDownSource.Cancel();
    }

    private async Task setNewUserPresence(CancellationToken token, User user)
    {
        while (!token.IsCancellationRequested)
        {
            var nextInterval = rand.Next(minimumDelayBetweenTest, maximumDelayBetweenTests);
            try
            {
                await Task.Delay(nextInterval * 1000, testAbortSource.Token).ConfigureAwait(false);
            }
            catch (TaskCanceledException)
            {
                break;
            }
            //now randomly generate a new state and submit it to the tester class
            UserState? status;
            var nbStates = Enum.GetValues(typeof(UserState)).Length;
            if (user.CurrentStatus == null)
            {
                var newInt = rand.Next(nbStates);
                status = (UserState)newInt;
            }
            else
            {
                do
                {
                    var newInt = rand.Next(nbStates);
                    status = (UserState)newInt;
                }
                while (status == user.CurrentStatus);
            }
            _ = sendUserStatus(user, status.Value);
        }
    }

    private async Task sendUserStatus(User user, UserState status)
    {
        await consumer.ProcessStateChange(user.UserId).ConfigureAwait(false);
    }

    private void generateTestObjects(int nbUsers, int nbTeams, bool addAllUsersToTeams = false)
    {
        users = new List<User>();
        for (int i = 0; i < nbUsers; i++)
        {
            var usr = new User
            {
                UserId = $"User_{i}",
                Groups = new List<Team>()
            };
            users.Add(usr);
        }
    }
}

It uses the variables at the beginning of the class to control the test. You can define the number of users (nbOfusers - every user is a producer that produces new data), the minimum (minimumDelayBetweenTest) and maximum (maximumDelayBetweenTests) delay between a user producing the next data and how long it takes the consumer to process the data (operationDuration).

StartTests starts the actual test, and StopTests stops the tests again.

I'm calling these as follows

static void Main(string[] args)
    {
        var tester = new SimpleProducer();
        Console.WriteLine("Test successfully started, type exit to stop");
        string str;
        do
        {
            str = Console.ReadLine();
            if (str == "start")
                tester.StartTests();
            else if (str == "stop")
                tester.StopTests();
        }
        while (str != "exit");
        tester.Shutdown();
    }

So, if I run my tester and type 'start', the Producer class starts producing states that are consumed by Consumer. And memory usage starts to grow and grow and grow. The sample is configured to the extreme, the real-life scenario I'm dealing with is less intensive, but one action of the producer could trigger multiple actions on the consumer side which also have to be executed in the same asynchronous abortable fifo fashion - so worst case, one set of data produced triggers an action for ~10 consumers (that last part I stripped out for brevity).

When I'm having a 100 producers, and each producer produces a new data item every 1-6 seconds (randomly, also the data produces is random). Consuming the data takes 3 seconds.. so there's plenty of cases where there's a new set of data before the old one has been properly processed.

Looking at two consecutive memory dumps, it's obvious where the memory usage is coming from.. it's all fragments that have to do with the queue. Given that I'm disposing every TaskCancellationSource and not keeping any references to the produced data (and the AsyncWorkItem they're put into), I'm at a loss to explain why this keeps eating up my memory and I'm hoping somebody else can show me the errors of my way. You can also abort testing by typing 'stop'.. you'll see that no longer is memory being eaten, but even if you pause and trigger GC, memory is not being freed either.

The source code of the project in runnable form is on Github. After starting it, you have to type start (plus enter) in the console to tell the producer to start producing data. And you can stop producing data by typing stop (plus enter)


Solution

  • Your code has so many issues making it impossible to find a leak through debugging. But here are several things that already are an issue and should be fixed first:

    Looks like getQueue creates a new queue for the same user each time processUseStateUpdateAsync gets called and does not reuse existing queues:

    var executor = groupStateChangeExecutors.GetOrAdd(user.UserId, getQueue());
    

    CancellationTokenSource is leaking on each call of the code below, as new value created each time the method AddOrUpdate is called, it should not be passed there that way:

    userStateChangeAborters.AddOrUpdate(user.UserId, new CancellationTokenSource(), (key, existingValue
    

    Also code below should use the same cts as you pass as new cts, if dictionary has no value for specific user.UserId:

    return new CancellationTokenSource();
    

    Also there is a potential leak of cancelSource variable as it gets bound to a delegate which can live for a time longer than you want, it's better to pass concrete CancellationToken there:

    executor.EnqueueTask(() => processUserStateUpdateAsync(user, state, previousState,
                        cancelSource.Token));
    

    By some reason you do not dispose aborter here and in one more place:

    userStateChangeAborters.TryRemove(user.UserId, out var aborter);
    

    Creation of Channel can have potential leaks:

    taskQueue = Channel.CreateBounded<AsyncWorkItem<T>>(new BoundedChannelOptions(1)
    

    You picked option FullMode = BoundedChannelFullMode.DropOldest which should remove oldest values if there are any, so I assume that that stops queued items from processing as they would not be read. It's a hypotheses, but I assume that if an old item is removed without being handled, then processUserStateUpdateAsync won't get called and all resources won't be freed.

    You can start with these found issues and it should be easier to find the real cause after that.