Search code examples
c#task-parallel-librarytpl-dataflow

Prevent BroadcastBlock from sending buffered message on LinkTo


Given a BroadcastBlock with a message in the buffer, is it possible to prevent that message from being sent to newly linked targets? For example:

static void Main(string[] args)
{
    var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
    var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));

    myBroadcastBlock.Post("Hello World!"); // No linked targets here.

    myBroadcastBlock.LinkTo(myActionBlock); // Link a target.

    // etc.
}

This code will print "Hello World". Basically, the BroadcastBlock will still send the buffered message to the ActionBlock on .LinkTo, despite the message having been posted prior to the link being established.

Is there a built-in way to prevent this behavior? I only want messages to be sent to current links, not future ones.

I am using System.Threading.Tasks.Dataflow 4.11.1


Solution

  • This behavior is not possible using the built-in BroadcastBlock class. Its behavior is not configurable. If you desperately need this behavior, you could try the implementation below. It uses an internal BroadcastBlock<(T, long)> with an index that is incremented with each new message, so that during linking the currently active message can be filtered out.

    There is quite a lot of indirection inside the BroadcastBlockNewOnly class, because of the need to translate from T to (T, long) and back to T. This makes the class hard to maintain, and also not very efficient. On every received message a new object is allocated, creating more work for the garbage collector, so use this class with caution.

    public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
    {
        private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
        private long _index;
    
        public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
            DataflowBlockOptions dataflowBlockOptions = null)
        {
            if (cloningFunction == null)
                throw new ArgumentNullException(nameof(cloningFunction));
            _broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
            {
                var (value, index) = entry;
                return (cloningFunction(value), index);
            }, dataflowBlockOptions ?? new DataflowBlockOptions());
        }
    
        public Task Completion => _broadcastBlock.Completion;
        public void Complete() => _broadcastBlock.Complete();
        void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);
    
        public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
        {
            if (target == null) throw new ArgumentNullException(nameof(target));
            var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
            var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
            return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
        }
    
        private long GetNewIndex() => Interlocked.Increment(ref _index);
    
        DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
            T value, ISourceBlock<T> source, bool consumeToAccept)
        {
            var sourceProxy = source != null ?
                new SourceProxy(source, this, GetNewIndex) : null;
            return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
                sourceProxy, consumeToAccept);
        }
    
        T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
            ITargetBlock<T> target, out bool messageConsumed)
        {
            var targetProxy = target != null ? new TargetProxy(target, this) : null;
            var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
                out messageConsumed);
            return value;
        }
    
        bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
            ITargetBlock<T> target)
        {
            var targetProxy = target != null ? new TargetProxy(target, this) : null;
            return _broadcastBlock.ReserveMessage(header, targetProxy);
        }
    
        void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
            ITargetBlock<T> target)
        {
            var targetProxy = target != null ? new TargetProxy(target, this) : null;
            _broadcastBlock.ReleaseReservation(header, targetProxy);
        }
    
        private class LinkedTargetProxy : ITargetBlock<(T, long)>
        {
            private readonly ITargetBlock<T> _realTarget;
            private readonly ISourceBlock<T> _realSource;
            private readonly long _indexLimit;
    
            public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
                long indexLimit)
            {
                _realTarget = realTarget;
                _realSource = realSource;
                _indexLimit = indexLimit;
            }
    
            DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
                DataflowMessageHeader header, (T, long) messageValue,
                ISourceBlock<(T, long)> source, bool consumeToAccept)
            {
                var (value, index) = messageValue;
                if (index <= _indexLimit) return DataflowMessageStatus.Declined;
                return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
            }
    
            Task IDataflowBlock.Completion => throw new NotSupportedException();
            void IDataflowBlock.Complete() => _realTarget.Complete();
            void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
        }
    
        private class SourceProxy : ISourceBlock<(T, long)>
        {
            private readonly ISourceBlock<T> _realSource;
            private readonly ITargetBlock<T> _realTarget;
            private readonly Func<long> _getNewIndex;
    
            public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
                Func<long> getNewIndex)
            {
                _realSource = realSource;
                _realTarget = realTarget;
                _getNewIndex = getNewIndex;
            }
    
            (T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
                ITargetBlock<(T, long)> target, out bool messageConsumed)
            {
                var value = _realSource.ConsumeMessage(header, _realTarget,
                    out messageConsumed);
                var newIndex = _getNewIndex();
                return (value, newIndex);
            }
    
            bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
                ITargetBlock<(T, long)> target)
            {
                return _realSource.ReserveMessage(header, _realTarget);
            }
    
            void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
                ITargetBlock<(T, long)> target)
            {
                _realSource.ReleaseReservation(header, _realTarget);
            }
    
            Task IDataflowBlock.Completion => throw new NotSupportedException();
            void IDataflowBlock.Complete() => throw new NotSupportedException();
            void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
            IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
                DataflowLinkOptions linkOptions) => throw new NotSupportedException();
        }
    
        private class TargetProxy : ITargetBlock<(T, long)>
        {
            private readonly ITargetBlock<T> _realTarget;
            private readonly ISourceBlock<T> _realSource;
    
            public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
            {
                _realTarget = realTarget;
                _realSource = realSource;
            }
    
            DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
                DataflowMessageHeader header, (T, long) messageValue,
                ISourceBlock<(T, long)> source, bool consumeToAccept)
            {
                var (value, index) = messageValue;
                return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
            }
    
            Task IDataflowBlock.Completion => throw new NotSupportedException();
            void IDataflowBlock.Complete() => throw new NotSupportedException();
            void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
        }
    
    }