Search code examples
c#tpl-dataflowrx.net

Retry tpl dataflow with/without Rx


I am using .net reactive extension with TPL dataflow. Here is my pipeline:

I am getting data points as stream from some external source and then I am transforming data points using dataflow TransformBlocks. After this I am using Rx buffer to buffer streamed points for 1 second and at last I am using dataflow Actionblock to post these buffered data points on REST end point.

I would like to retry REST post operation on transient errors. Where should I retry:

  1. After buffer?
  2. Inside Action block?
  3. What would happen to continuous streaming at the time of retry? I don't want to miss any data.

Solution

  • Just going off the high level overview provided, I think it would be easiest to retry in the final ActionBlock. Your ActionBlock would post and confirm success before ending. Depending on the volume of data this approach lets you spool up as many ActionBlocks as necessary without too much concern over dropped items, really shouldn't be any. If a single or many ActionBlock instances fail their post, your items will still be streaming and distributed with respect to how you've set up your buffers and input queues waiting for their chance on the wire.

    Edit Just some pseudo code, but this will take a batch of datapoints, the IEnumerable, and try to post them five times. The bounded capacity will cause each instance of the handler to queue up 1000 batches and the parallelism will distribute the batches amongst the action blocks. Optionally an unbounded Buffer could be added before the ActionBlock to hold all incoming batches. You'll need to take care that your producer, the stream, doesn't greatly out run your consumer, REST service.

        public void ConfigureFinalActionBlock() {
            var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
                BoundedCapacity = DataflowBlockOptions.Unbounded
            });
    
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 1000,
                MaxDegreeOfParallelism = Environment.ProcessorCount
            };
            var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
                var success = false;
                var attempts = 0;
                while (!success && attempts < 5) {
                    attempts++;
                    success = await MyApiPostAsync(data);
                }
            }, options);
    
            dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
                PropagateCompletion = true
            });