Search code examples
c#.nettask-parallel-librarytpl-dataflow

TPL Dataflow block which delays the forward of the message to the next block


I require a Dataflow block which delays the forward of the message to the next block based on the timestamp in the message (LogEntry).

This is what i came up with but it feels not right. Any suggestions for improvements?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }

Solution

  • You could simply use a single TransformBlock that asynchronously waits out the delay using Task.Delay:

    IPropagatorBlock<TItem, TItem> DelayedForwardBlock<TItem>(TimeSpan delay)
    {
        return new TransformBlock<TItem, TItem>(async item =>
        {
            await Task.Delay(delay);
            return item;
        });
    }
    

    Usage:

    var block = DelayedForwardBlock<LogEntry>(TimeSpan.FromMinutes(5));