Search code examples
c#task-parallel-librarytpl-dataflow

TPL Dataflow with Railway Programming


I'm attempting to implement a moderately complex TPL Dataflow pipeline with Railway Programming, where messages can get sent down the "happy path" if they succeed in the current block, or go to the failure block if they don't succeed. The problem I'm having is, completion keeps getting propagated unexpectedly. I don't have issues running locally, but running this on a VM causes issues.

Once the matcherBuffer is filled, I send a Completion to the "Head" or matchBuffer.

    private void CreatePipeline()
    {
        int blockCapacity;
        if(!Int32.TryParse(ConfigurationManager.AppSettings["TPLBlockCapacity"], out blockCapacity)){
            logger.WriteInformation("TPLBlockCapacity app.config value could not be properly parsed.  TPLBlockCapacity will default to 1000.");
            blockCapacity=1000;
        }
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        var blockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = blockCapacity };
        matcher = new Matcher(secondarySource, logger);
        comparer = new Comparer(logger, report, fileSystem, dateTime);
        confirmer = new Confirmer(confirmationSource, logger, dateTime, report, fileSystem);
        changer = new Changer(logger, rioEmployeeEditor);
        unconfirmedWriter = new UnconfirmedWriter(logger, rioEmployeeEditor);
        failer = new Failer(logger, report, fileSystem, dateTime);
        finalizer = new Finalizer(logger, fileSystem, archiveFactory);


        var matchBuffer = new BufferBlock<IResult>(blockOptions);
        var matcherBlock = new TransformBlock<IResult, IResult>(result => matcher.Process(result));
        var failureBlock = new TransformBlock<IResult, IResult>(result => failer.Process(result));
        var comparerBlock = new TransformBlock<IResult, IResult>(result => comparer.Process(result));
        var confirmerBlock = new TransformBlock<IResult, IResult>(result => confirmer.Process(result));
        var confirmBroadcaster = new BroadcastBlock<IResult>(x => x.Clone());
        var unconfirmedBlock = new ActionBlock<IResult>(result => unconfirmedWriter.Process(result));
        var changerBlock = new TransformBlock<IResult, IResult>(result => changer.Process(result));
        var finalizerBlock = new ActionBlock<IResult>(result => finalizer.Process(result));
        //Message buffer from Selector links
        matchBuffer.LinkTo(matcherBlock, linkOptions, result => result.Success);
        //Matcher links
        matcherBlock.LinkTo(comparerBlock, linkOptions, result => result.Success);
        matcherBlock.LinkTo(failureBlock, result => !result.Success);
        //Comparer links

        comparerBlock.LinkTo(confirmerBlock, linkOptions, result => result.Success);
        comparerBlock.LinkTo(failureBlock, result => !result.Success);
        //Confirmer links

        confirmerBlock.LinkTo(confirmBroadcaster, linkOptions, result => result.Success);
        confirmerBlock.LinkTo(failureBlock, result => !result.Success);
        //ConfirmBroadcaster links
        confirmBroadcaster.LinkTo(changerBlock, result => result.Success);
        confirmBroadcaster.LinkTo(unconfirmedBlock, linkOptions, result => result.Success);

        changerBlock.LinkTo(finalizerBlock, linkOptions);
        failureBlock.LinkTo(finalizerBlock);

        unconfirmedBlock.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("Unconfirmed Block Complete.");
            failureBlock.Complete();

        });
        failureBlock.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("Failure Block Complete.");
            changerBlock.Complete();

        });
        Head = matchBuffer;
        Tail = finalizerBlock;
        ErrorBlock = failureBlock;
    }

Every scenario I can think of works fine locally, all blocks are able to make it through to finalizer and I get my expected results, but when I push to our VM, I get completely different behavior. The application ends prematurely and the finalizer doesn't even begin to execute. All of these blocks can take any amount of time to finish, but that doesn't seem to be an issue locally. I can put a Thread.Sleep in any of these blocks and it still performs as expected, but I still a premature closure on the VM.

To see my intended flow, save the following to an XML file and open it at www.draw.io. Black lines are data flow, red lines are propagation flow.

    <mxfile userAgent="Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.84 Safari/537.36" version="5.5.1.6" editor="www.draw.io" type="device"><diagram>3Vtds6I4EP01Ps4UCKj3cbxzZ/Zhp2qr3K3ZfcxohOwgsWK8H/PrJ5FuPoIoKISrvkiaJoTu032aJIy8x83rV0G20Te+ovFo7KxeR97n0XjsTqa++tOSt1QynTmpIBRsBUq5YMF+URCi2p6t6K6kKDmPJduWhUueJHQpSzIiBH8pq615XL7rloR4x1ywWJK4Kv3OVjJKpbPxJJf/QVkY4Z3dyUN65gdZ/gwF3ydwv9HYWx9+6ekNwb7gQXcRWfGXgsh7UnYVnKue9dHm9ZHG2rZotvS6LzVns3ELmsDYzlwwg3HIN3x2ulKmgCYXMuIhT0j8lEvnh+ejugdHtSK5idWhqw7pK5P/avHHAFr/wZn/qZRv4Geyl1yJ8r7/5HwLetXxwyPt+F4sYYSqfUAEESEFLS8V6bEXLoNn/kr5hkrxphQEjYlkz2U3E0BLmOnlFlMHYLTjBoSxPJN4D50uaKwgyUXFsC8Rk3SxJYfHeFGBUzZe7aM/UyEphtbxx4KzE0AVRF0AzZccwi4CLyrAFy+7xg4uBEDfQFLGEW+FU7qpz+kOOgUZIKoIMkhq9kHm9xOlBdM6H8eZObNzmW0Ppm5q3W1MWPJBqDu/FzMfLv0kBNFXocKWs0TuCj3/pQW1seQCg2WJ1FD3r1LHdo6HdLg5OrLnbgSYKVr6OGASnpxESFNXZ3DJGjlczsOg1/x93OwepkSwO5Yl2EM6JLjolPtMd5spNIV4paMLXAmWKRDMNyKXEVX84sz367U+aEc0axbHjzxWDKW1sT7x5jsp+E9aOOMcfr1Qk+tZ5KYs79wLN0GGLAYQVKZ9B1DmI0xcD80iqD77XhMa8NDd+hWP2zPfKf+38avhxLKj3WkvjjUz2mx6rWOb3adr5vP7qUMrmLBfKVkL+isrJWw3LX3aqfuKkGrxcm5oHmavM6C+BHf1NH177JzNw1hhZ6hw7oadISqLgQrvb7bZ2cdydxB2Duyw85B+Tcm4f8cOxc4ns+0FmAggTd4fO1sL+ivZ2TcnhM/QbTv1IEiD/iJ29hu+jV+AO/BOgZ0f+UZ79Cbp2cPot0LPvaTxAekZwrIYqfDOYH9i14Plg7vhyCPGvXeOPJnzLuFIsNf9caSNyOuCIwNATlPSa6c+6XjGA6xa4rZkzcTmNsktQMKxQm7dziDmWXbafZot5VTgjYHjxoXXgwza5Ug4U/5Vrjbrv5pFlEpHmOhxswn228MsTy98jXAZohqCBFwCl6XFuGyWqaXfWqPUWPSbQoapg1NlXGX96wnewmq+UfTli/ttQdSO4Y+h6X2kKoNlJu1mnQ31M4kN1zb7z0dg7yr368VhwclqSXbyNuuAic05aNy+Z+8l11bNjeE3xBuZh4QOLn3oKcEb/D97OJ3fPQNpbfUfynt7ruYDdFEhiv9J1M7WQw2vvOx8FypabyOITepEBNgIYs/SXlZrtRkWYqXNcWAo28WZ6yCrDbGWNOlnTuRk3X35K1zLDD2kl819bIN6GS1RLGcikoTtc18Xmcx8a7CaymzXI0c2cHabyiyVHGYt4DqmOzpcVOp0MqDbPben3gtbOWyQiQFzQqdhUqpWbkZHWLmdm2FqvSRgkKVRIZ7Vn0G7s4oSnFdIpF8IiwfJo5WXAJtfpSC9FQ3BVDypMHsPnOJmD27FGP1svoDJJUwxXU081Zq77+yDO5rrdjg3nZU+1093RDSt1k0Lqr5AHDt6l0NMJeOJaixYqEw8Gk9iZdD5ij2rw1AfouiHiomyRN25pGfAR8FeywuoEHTHfpEfBwXtbe1hJdFNfVpFXpio45iu9YU6cpj66vITiKX2/nynQpAl4d8HKHzwNa+l+VfdM5iPgs/dBGO2OQ6D0T+y8IQ6xVg02ahBLKpm/k1n6uH8w1nv6Tc=</diagram></mxfile>

EDIT: The code the makes the call to fill the buffer:

            private static void Start(){
    // ...
    var selectionStrategy=new SelectionStrategyFactory().GetSelectionStrategy(selectionStrategyType);
    var selector=new Selector(selectionStrategy, logger, dateTime);
    selector.LinkTo(Pipeline.Head, Pipeline.ErrorBlock);
    selector.Process(); //fill the Queue Here
            Pipeline.Head.Complete();
    Pipeline.Tail.WaitForCompletion();
    }

Actually filling the buffer:

        class Selector{
    public void Process(){
        try{
        do{
            foreach(var employee in selectionStrategy.Select()){
            var result = new Result();
            target.Post(result);  //Changing this to target.SendAsync fixed the issue
            }
        } while(selectionStrategy.Stop()==false);
        }
        catch(Exception ex){
        //..
        }
        finally{
        //..
        }
    }
    }

Solution

  • Sorry for not posting additional code. I was finally able to figure out my issue.

    I was filling the matchBuffer (target) by doing a target.Post(IResult). Turns out, I needed target.SendAsync(IResult). I can only assume my pipeline completion was propagating before the target was accepting the message, but (obviously) after all my messages were sent.