Given a BroadcastBlock
with a message in the buffer, is it possible to prevent that message from being sent to newly linked targets? For example:
static void Main(string[] args)
{
var myBroadcastBlock = new BroadcastBlock<string>(msg => msg);
var myActionBlock = new ActionBlock<string>(msg => Console.WriteLine(msg));
myBroadcastBlock.Post("Hello World!"); // No linked targets here.
myBroadcastBlock.LinkTo(myActionBlock); // Link a target.
// etc.
}
This code will print "Hello World". Basically, the BroadcastBlock
will still send the buffered message to the ActionBlock
on .LinkTo
, despite the message having been posted prior to the link being established.
Is there a built-in way to prevent this behavior? I only want messages to be sent to current links, not future ones.
I am using System.Threading.Tasks.Dataflow 4.11.1
This behavior is not possible using the built-in BroadcastBlock
class. Its behavior is not configurable. If you desperately need this behavior, you could try the implementation below. It uses an internal BroadcastBlock<(T, long)>
with an index that is incremented with each new message, so that during linking the currently active message can be filtered out.
There is quite a lot of indirection inside the BroadcastBlockNewOnly
class, because of the need to translate from T
to (T, long)
and back to T
. This makes the class hard to maintain, and also not very efficient. On every received message a new object is allocated, creating more work for the garbage collector, so use this class with caution.
public class BroadcastBlockNewOnly<T> : ITargetBlock<T>, ISourceBlock<T>
{
private readonly IPropagatorBlock<(T, long), (T, long)> _broadcastBlock;
private long _index;
public BroadcastBlockNewOnly(Func<T, T> cloningFunction,
DataflowBlockOptions dataflowBlockOptions = null)
{
if (cloningFunction == null)
throw new ArgumentNullException(nameof(cloningFunction));
_broadcastBlock = new BroadcastBlock<(T, long)>(entry =>
{
var (value, index) = entry;
return (cloningFunction(value), index);
}, dataflowBlockOptions ?? new DataflowBlockOptions());
}
public Task Completion => _broadcastBlock.Completion;
public void Complete() => _broadcastBlock.Complete();
void IDataflowBlock.Fault(Exception ex) => _broadcastBlock.Fault(ex);
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
if (target == null) throw new ArgumentNullException(nameof(target));
var currentIndex = Interlocked.CompareExchange(ref _index, 0, 0);
var linkedTargetProxy = new LinkedTargetProxy(target, this, currentIndex);
return _broadcastBlock.LinkTo(linkedTargetProxy, linkOptions);
}
private long GetNewIndex() => Interlocked.Increment(ref _index);
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader header,
T value, ISourceBlock<T> source, bool consumeToAccept)
{
var sourceProxy = source != null ?
new SourceProxy(source, this, GetNewIndex) : null;
return _broadcastBlock.OfferMessage(header, (value, GetNewIndex()),
sourceProxy, consumeToAccept);
}
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<T> target, out bool messageConsumed)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
var (value, index) = _broadcastBlock.ConsumeMessage(header, targetProxy,
out messageConsumed);
return value;
}
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
return _broadcastBlock.ReserveMessage(header, targetProxy);
}
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<T> target)
{
var targetProxy = target != null ? new TargetProxy(target, this) : null;
_broadcastBlock.ReleaseReservation(header, targetProxy);
}
private class LinkedTargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
private readonly long _indexLimit;
public LinkedTargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource,
long indexLimit)
{
_realTarget = realTarget;
_realSource = realSource;
_indexLimit = indexLimit;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
if (index <= _indexLimit) return DataflowMessageStatus.Declined;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => _realTarget.Complete();
void IDataflowBlock.Fault(Exception ex) => _realTarget.Fault(ex);
}
private class SourceProxy : ISourceBlock<(T, long)>
{
private readonly ISourceBlock<T> _realSource;
private readonly ITargetBlock<T> _realTarget;
private readonly Func<long> _getNewIndex;
public SourceProxy(ISourceBlock<T> realSource, ITargetBlock<T> realTarget,
Func<long> getNewIndex)
{
_realSource = realSource;
_realTarget = realTarget;
_getNewIndex = getNewIndex;
}
(T, long) ISourceBlock<(T, long)>.ConsumeMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target, out bool messageConsumed)
{
var value = _realSource.ConsumeMessage(header, _realTarget,
out messageConsumed);
var newIndex = _getNewIndex();
return (value, newIndex);
}
bool ISourceBlock<(T, long)>.ReserveMessage(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
return _realSource.ReserveMessage(header, _realTarget);
}
void ISourceBlock<(T, long)>.ReleaseReservation(DataflowMessageHeader header,
ITargetBlock<(T, long)> target)
{
_realSource.ReleaseReservation(header, _realTarget);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
IDisposable ISourceBlock<(T, long)>.LinkTo(ITargetBlock<(T, long)> target,
DataflowLinkOptions linkOptions) => throw new NotSupportedException();
}
private class TargetProxy : ITargetBlock<(T, long)>
{
private readonly ITargetBlock<T> _realTarget;
private readonly ISourceBlock<T> _realSource;
public TargetProxy(ITargetBlock<T> realTarget, ISourceBlock<T> realSource)
{
_realTarget = realTarget;
_realSource = realSource;
}
DataflowMessageStatus ITargetBlock<(T, long)>.OfferMessage(
DataflowMessageHeader header, (T, long) messageValue,
ISourceBlock<(T, long)> source, bool consumeToAccept)
{
var (value, index) = messageValue;
return _realTarget.OfferMessage(header, value, _realSource, consumeToAccept);
}
Task IDataflowBlock.Completion => throw new NotSupportedException();
void IDataflowBlock.Complete() => throw new NotSupportedException();
void IDataflowBlock.Fault(Exception ex) => throw new NotSupportedException();
}
}