Search code examples
c#dataflowtpl-dataflow

How to emit a cartesian product in TPL/Dataflow?


I am trying to implement the following behaviour:

[TestMethod]
public async Task ProducesCartesianProductOfInputs()
{
    var block = new CartesianProductBlock<int, string>();
    var target = new BufferBlock<Tuple<int, string>>();

    var left = block.Left;
    var right = block.Right;

    block.LinkTo(target);

    var actual = new List<Tuple<int, string>>();

    Assert.IsTrue(left.Post(1));
    Assert.IsTrue(right.Post("a"));
    Assert.IsTrue(left.Post(2));
    Assert.IsTrue(right.Post("b"));

    // PROBLEM?: These can run before messages have been processed and appear to abort further processing.
    left.Complete();
    right.Complete();

    while (await target.OutputAvailableAsync())
    {
        actual.Add(target.Receive());
    }

    var expected = new List<Tuple<int, string>>()
    {
        Tuple.Create(1, "a"),
        Tuple.Create(2, "a"),
        Tuple.Create(1, "b"),
        Tuple.Create(2, "b"),
    };

    CollectionAssert.AreEquivalent(expected, actual.ToList());
}

My current (partial) implementation does not work and I can't figure out why:

// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
{
    private TransformManyBlock<T1, Tuple<T1, T2>> left;
    private TransformManyBlock<T2, Tuple<T1, T2>> right;

    private List<T1> leftReceived = new List<T1>();
    private List<T2> rightReceived = new List<T2>();
    private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();

    private object lockObject = new object();

    public ITargetBlock<T1> Left { get { return left; } }
    public ITargetBlock<T2> Right { get { return right; } }

    public CartesianProductBlock()
    {
        left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
        {
            lock (lockObject)
            {
                leftReceived.Add(l);
                // Pair this input up with all received alternatives
                return rightReceived.Select(r => Tuple.Create(l, r));
            }
        });
        right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
        {
            lock(lockObject)
            {
                rightReceived.Add(r);
                // Pair this input up with all received alternatives
                return leftReceived.Select(l => Tuple.Create(l, r));
            }
        });
        Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
            // TODO: Respect propagate completion linkOptions. Defauting to propagation for now.
            foreach (var target in targets)
            {
                target.Complete();
            }
        });
    }

    private TaskCompletionSource<int> completion = new TaskCompletionSource<int>();

    public Task Completion => completion.Task;

    public void Complete() { throw new NotImplementedException(); }
    public void Fault(Exception exception) { throw new NotImplementedException(); }

    public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
    {
        left.LinkTo(target);
        right.LinkTo(target);
        targets.Add(target);
        return null; // TODO: Return something proper to allow unlinking
    }

    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    {
        throw new NotImplementedException();
    }

    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
    {
        throw new NotImplementedException();
    }

    public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
    {
        throw new NotImplementedException();
    }
}

I'm experiencing the following (probably related) issues:

  • It is non-deterministic. The test fails in different ways.
  • It appears (from adding in logging, and also since I get anywhere from 3 to 6 output messages) that the Complete call to the two inputs is causing messages to not be processed, though my understanding is that it should allow all queues to drain first. (And if this is not the case, then I don't know how to write the test correctly.)
  • It's quite possible my locking scheme is wrong/suboptimal, though my goal was to have something big and coarse that worked before trying to fix.
  • My experiments with individual TransformManyBlocks has failed to come up with interesting surprising, and I can't figure out what's different in this case.

Solution

  • As suspected, this was related to completeness propagation. Here is a working version, including proper link disposable and respecting link options:

    // A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
    public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
    {
        private TransformManyBlock<T1, Tuple<T1, T2>> left;
        private TransformManyBlock<T2, Tuple<T1, T2>> right;
    
        private List<T1> leftReceived = new List<T1>();
        private List<T2> rightReceived = new List<T2>();
        private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
    
        private object lockObject = new object();
    
        public ITargetBlock<T1> Left { get { return left; } }
        public ITargetBlock<T2> Right { get { return right; } }
    
        public CartesianProductBlock()
        {
            left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
            {
                lock (lockObject)
                {
                    leftReceived.Add(l);
                    return rightReceived.Select(r => Tuple.Create(l, r)).ToList();
                }
            });
            right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
            {
                lock(lockObject)
                {
                    rightReceived.Add(r);
                    return leftReceived.Select(l => Tuple.Create(l, r)).ToList();
                }
            });
    
            Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
                completion.SetResult(VoidResult.Instance);
            });
        }
    
        private TaskCompletionSource<VoidResult> completion = new TaskCompletionSource<VoidResult>();
    
        public Task Completion => completion.Task;
    
        public void Complete() {
            Left.Complete();
            Right.Complete();
        }
    
        public void Fault(Exception exception) { throw new NotImplementedException(); }
    
        public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
        {
            var leftLink = left.LinkTo(target);
            var rightLink = right.LinkTo(target);
    
            var link = new Link(leftLink, rightLink);
    
            Task task = Task.FromResult(0);
            if (linkOptions.PropagateCompletion)
            {
                task = Task.WhenAny(Task.WhenAll(Left.Completion, Right.Completion), link.Completion).ContinueWith(_ =>
                {
                    // If the link has been disposed of, we should not longer propagate completeness.
                    if (!link.Completion.IsCompleted)
                    {
                        target.Complete();
                    }
                });
            }
    
            return link;
        }
    
        public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            throw new NotImplementedException();
        }
    
        public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
        {
            throw new NotImplementedException();
        }
    
        public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
        {
            throw new NotImplementedException();
        }
    
        private class Link : IDisposable
        {
            private IDisposable leftLink;
            private IDisposable rightLink;
    
            public Link(IDisposable leftLink, IDisposable rightLink)
            {
                this.leftLink = leftLink;
                this.rightLink = rightLink;
            }
    
            private TaskCompletionSource<VoidResult> completionSource = new TaskCompletionSource<VoidResult>();
            public Task Completion { get { return completionSource.Task; } }
    
            public void Dispose()
            {
                leftLink.Dispose();
                rightLink.Dispose();
                completionSource.SetResult(VoidResult.Instance);
            }
        }
    
        private class VoidResult
        {
            public static VoidResult instance = new VoidResult();
            public static VoidResult Instance { get { return instance; } }
    
            protected VoidResult() { }
        }
    }