Search code examples
c#.nettpl-dataflow

How can I gain access input queue of ActionBlock?


I'm passing to Actionblock instances of some class. If I call

cancellationSource.Cancel();

then processing will stop. But some instances can stay in input queue of ActionBlock. I need to gain access to that remaining instances in orger to release some resources.

How can I acheive this goal?


Solution

  • If you desperately need an ActionBlock with an exposed input buffer, you could try the custom implementation below. It supports all the built-in functionality of an ActionBlock, including also a custom IEnumerable<T> InputQueue property. The input buffer is not emptied when the ActionBlockEx completes in a faulted or canceled state.

    public class ActionBlockEx<T> : ITargetBlock<T>
    {
        private readonly ITargetBlock<object> _actionBlock;
        private readonly Queue<T> _queue;
    
        public ActionBlockEx(Func<T, Task> action,
            ExecutionDataflowBlockOptions dataflowBlockOptions = null)
        {
            if (action == null) throw new ArgumentNullException(nameof(action));
            _actionBlock = new ActionBlock<object>(_ =>
            {
                T item; lock (_queue) item = _queue.Dequeue();
                return action(item);
            }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
            _queue = new Queue<T>();
        }
    
        public ActionBlockEx(Action<T> action,
            ExecutionDataflowBlockOptions dataflowBlockOptions = null) : this(
                item => { action(item); return Task.CompletedTask; }, dataflowBlockOptions)
        {
            if (action == null) throw new ArgumentNullException(nameof(action));
        }
    
        public int InputCount { get { lock (_queue) return _queue.Count; } }
        public IEnumerable<T> InputQueue { get { lock (_queue) return _queue.ToList(); } }
    
        public Task Completion => _actionBlock.Completion;
        public void Complete() => _actionBlock.Complete();
        void IDataflowBlock.Fault(Exception ex) => _actionBlock.Fault(ex);
    
        DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
            T item, ISourceBlock<T> source, bool consumeToAccept)
        {
            var sourceProxy = source != null ? new SourceProxy(source, this) : null;
            lock (_queue)
            {
                var offerResult = _actionBlock.OfferMessage(header, null, sourceProxy,
                    consumeToAccept);
                if (offerResult == DataflowMessageStatus.Accepted
                    && (sourceProxy == null || !sourceProxy.ConsumeMessageInvoked))
                {
                    _queue.Enqueue(item);
                }
                return offerResult;
            }
        }
    
        private class SourceProxy : ISourceBlock<object>
        {
            private readonly ISourceBlock<T> _realSource;
            private readonly ActionBlockEx<T> _realTarget;
    
            public bool ConsumeMessageInvoked { get; private set; }
    
            public SourceProxy(ISourceBlock<T> realSource, ActionBlockEx<T> realTarget)
            {
                _realSource = realSource;
                _realTarget = realTarget;
            }
    
            object ISourceBlock<object>.ConsumeMessage(DataflowMessageHeader header,
                ITargetBlock<object> target, out bool messageConsumed)
            {
                this.ConsumeMessageInvoked = true;
                lock (_realTarget._queue)
                {
                    var item = _realSource.ConsumeMessage(header, _realTarget,
                        out messageConsumed);
                    if (messageConsumed) _realTarget._queue.Enqueue(item);
                }
                return null;
            }
    
            bool ISourceBlock<object>.ReserveMessage(DataflowMessageHeader header,
                ITargetBlock<object> target)
            {
                return _realSource.ReserveMessage(header, _realTarget);
            }
    
            void ISourceBlock<object>.ReleaseReservation(DataflowMessageHeader header,
                ITargetBlock<object> target)
            {
                _realSource.ReleaseReservation(header, _realTarget);
            }
    
            Task IDataflowBlock.Completion => throw new NotSupportedException();
            void IDataflowBlock.Complete() => throw new NotSupportedException();
            void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
            IDisposable ISourceBlock<object>.LinkTo(ITargetBlock<object> target,
                DataflowLinkOptions linkOptions) => throw new NotSupportedException();
        }
    
    }
    

    This implementation is based on an internal ActionBlock<object> that is supplied with dummy null messages. Its communication with the linked ISourceBlock is intercepted so that the actual messages are acquired and stored in an internal Queue<T>. This indirection adds some overhead (an object allocation occurs on every message received), so use this class with caution!