Search code examples
c#.nettpl-dataflow

TPL dataflow process N latest messages


I'm trying to create some sort of queue that will process the N latest messages received. Right now I have this:

private static void SetupMessaging()
{
    _messagingBroadcastBlock = new BroadcastBlock<string>(msg => msg, new ExecutionDataflowBlockOptions
    {
        //BoundedCapacity = 1,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1
    });

    _messagingActionBlock = new ActionBlock<string>(msg =>
    {
        Console.WriteLine(msg);
        Thread.Sleep(5000);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 2,
        EnsureOrdered = true,
        MaxDegreeOfParallelism = 1,
        MaxMessagesPerTask = 1    
    });

    _messagingBroadcastBlock.LinkTo(_messagingActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
    _messagingBroadcastBlock.LinkTo(DataflowBlock.NullTarget<string>());
}

The problem is if I post 1,2,3,4,5 to it I will get 1,2,5 but i'd like it to be 1,4,5. Any suggestions are welcome.
UPD 1
I was able to make the following solution work

class FixedCapacityActionBlock<T>
{
    private readonly ActionBlock<CancellableMessage<T>> _actionBlock;

    private readonly ConcurrentQueue<CancellableMessage<T>> _inputCollection = new ConcurrentQueue<CancellableMessage<T>>();

    private readonly int _maxQueueSize;

    private readonly object _syncRoot = new object();

    public FixedCapacityActionBlock(Action<T> act, ExecutionDataflowBlockOptions opt)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            EnsureOrdered = opt.EnsureOrdered,
            CancellationToken = opt.CancellationToken,
            MaxDegreeOfParallelism = opt.MaxDegreeOfParallelism,
            MaxMessagesPerTask = opt.MaxMessagesPerTask,
            NameFormat = opt.NameFormat,
            SingleProducerConstrained = opt.SingleProducerConstrained,
            TaskScheduler = opt.TaskScheduler,
            //we intentionally ignore this value
            //BoundedCapacity = opt.BoundedCapacity
        };
        _actionBlock = new ActionBlock<CancellableMessage<T>>(cmsg =>
        {
            if (cmsg.CancellationTokenSource.IsCancellationRequested)
            {
                return;
            }

            act(cmsg.Message);
        }, options);

        _maxQueueSize = opt.BoundedCapacity;
    }

    public bool Post(T msg)
    {
        var fullMsg = new CancellableMessage<T>(msg);

        //what if next task starts here?
        lock (_syncRoot)
        {
            _inputCollection.Enqueue(fullMsg);

            var itemsToDrop = _inputCollection.Skip(1).Except(_inputCollection.Skip(_inputCollection.Count - _maxQueueSize + 1));

            foreach (var item in itemsToDrop)
            {
                item.CancellationTokenSource.Cancel();
                CancellableMessage<T> temp;
                _inputCollection.TryDequeue(out temp);
            }

            return _actionBlock.Post(fullMsg);
        }
    }
}

And

class CancellableMessage<T> : IDisposable
{
    public CancellationTokenSource CancellationTokenSource { get; set; }

    public T Message { get; set; }

    public CancellableMessage(T msg)
    {
        CancellationTokenSource = new CancellationTokenSource();
        Message = msg;
    }

    public void Dispose()
    {
        CancellationTokenSource?.Dispose();
    }
}

While this works and actually does the job this implementation looks dirty, also possibly not thread safe.


Solution

  • Here is a TransformBlock and ActionBlock implementation that drops the oldest messages in its queue, whenever newer messages are received and the BoundedCapacity limit has been reached. It behaves quite similar to a Channel configured with BoundedChannelFullMode.DropOldest.

    public static IPropagatorBlock<TInput, TOutput>
        CreateTransformBlockDropOldest<TInput, TOutput>(
        Func<TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IProgress<TInput> droppedMessages = null)
    {
        if (transform == null) throw new ArgumentNullException(nameof(transform));
        dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
    
        var boundedCapacity = dataflowBlockOptions.BoundedCapacity;
        var cancellationToken = dataflowBlockOptions.CancellationToken;
    
        var queue = new Queue<TInput>(Math.Max(0, boundedCapacity));
    
        var outputBlock = new BufferBlock<TOutput>(new DataflowBlockOptions()
        {
            BoundedCapacity = boundedCapacity,
            CancellationToken = cancellationToken
        });
    
        if (boundedCapacity != DataflowBlockOptions.Unbounded)
            dataflowBlockOptions.BoundedCapacity = checked(boundedCapacity * 2);
        // After testing, at least boundedCapacity + 1 is required.
        // Make it double to be sure that all non-dropped messages will be processed.
        var transformBlock = new ActionBlock<object>(async _ =>
        {
            TInput item;
            lock (queue)
            {
                if (queue.Count == 0) return;
                item = queue.Dequeue();
            }
            var result = await transform(item).ConfigureAwait(false);
            await outputBlock.SendAsync(result, cancellationToken).ConfigureAwait(false);
        }, dataflowBlockOptions);
        dataflowBlockOptions.BoundedCapacity = boundedCapacity; // Restore initial value
    
        var inputBlock = new ActionBlock<TInput>(item =>
        {
            var droppedEntry = (Exists: false, Item: (TInput)default);
            lock (queue)
            {
                transformBlock.Post(null);
                if (queue.Count == boundedCapacity) droppedEntry = (true, queue.Dequeue());
                queue.Enqueue(item);
            }
            if (droppedEntry.Exists) droppedMessages?.Report(droppedEntry.Item);
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken
        });
    
        PropagateCompletion(inputBlock, transformBlock);
        PropagateFailure(transformBlock, inputBlock);
        PropagateCompletion(transformBlock, outputBlock);
        _ = transformBlock.Completion.ContinueWith(_ => { lock (queue) queue.Clear(); },
            TaskScheduler.Default);
    
        return DataflowBlock.Encapsulate(inputBlock, outputBlock);
    
        async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
        {
            try { await source.Completion.ConfigureAwait(false); } catch { }
            var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
            if (exception != null) target.Fault(exception); else target.Complete();
        }
        async void PropagateFailure(IDataflowBlock source, IDataflowBlock target)
        {
            try { await source.Completion.ConfigureAwait(false); } catch { }
            if (source.Completion.IsFaulted) target.Fault(source.Completion.Exception);
        }
    }
    
    // Overload with synchronous lambda
    public static IPropagatorBlock<TInput, TOutput>
        CreateTransformBlockDropOldest<TInput, TOutput>(
        Func<TInput, TOutput> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IProgress<TInput> droppedMessages = null)
    {
        return CreateTransformBlockDropOldest(item => Task.FromResult(transform(item)),
            dataflowBlockOptions, droppedMessages);
    }
    
    // ActionBlock equivalent
    public static ITargetBlock<TInput>
        CreateActionBlockDropOldest<TInput>(
        Func<TInput, Task> action,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IProgress<TInput> droppedMessages = null)
    {
        if (action == null) throw new ArgumentNullException(nameof(action));
        var block = CreateTransformBlockDropOldest<TInput, object>(
            async item => { await action(item).ConfigureAwait(false); return null; },
            dataflowBlockOptions, droppedMessages);
        block.LinkTo(DataflowBlock.NullTarget<object>());
        return block;
    }
    
    // ActionBlock equivalent with synchronous lambda
    public static ITargetBlock<TInput>
        CreateActionBlockDropOldest<TInput>(
        Action<TInput> action,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null,
        IProgress<TInput> droppedMessages = null)
    {
        return CreateActionBlockDropOldest(
            item => { action(item); return Task.CompletedTask; },
            dataflowBlockOptions, droppedMessages);
    }
    

    The idea is to store the queued items in an auxiliary Queue, and pass dummy (null) values to an internal ActionBlock<object>. The block ignores the items passed as arguments, and takes instead an item from the queue, if there is any. Α lock is used to ensure that all non-dropped items in the queue will be eventually processed (unless of course an exception occurs).

    There is also an extra feature. An optional IProgress<TInput> droppedMessages argument allows to receive notifications every time a message is dropped.

    Usage example:

    _messagingActionBlock = CreateActionBlockDropOldest<string>(msg =>
    {
        Console.WriteLine($"Processing: {msg}");
        Thread.Sleep(5000);
    }, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 2,
    }, new Progress<string>(msg =>
    {
        Console.WriteLine($"Message dropped: {msg}");
    }));