I am using .net reactive extension with TPL dataflow. Here is my pipeline:
I am getting data points as stream from some external source and then I am transforming data points using dataflow TransformBlocks. After this I am using Rx buffer to buffer streamed points for 1 second and at last I am using dataflow Actionblock to post these buffered data points on REST end point.
I would like to retry REST post operation on transient errors. Where should I retry:
Just going off the high level overview provided, I think it would be easiest to retry in the final ActionBlock
. Your ActionBlock
would post and confirm success before ending. Depending on the volume of data this approach lets you spool up as many ActionBlock
s as necessary without too much concern over dropped items, really shouldn't be any. If a single or many ActionBlock
instances fail their post, your items will still be streaming and distributed with respect to how you've set up your buffers and input queues waiting for their chance on the wire.
Edit
Just some pseudo code, but this will take a batch of datapoints, the IEnumerable, and try to post them five times. The bounded capacity will cause each instance of the handler to queue up 1000 batches and the parallelism will distribute the batches amongst the action blocks. Optionally an unbounded Buffer
could be added before the ActionBlock
to hold all incoming batches. You'll need to take care that your producer, the stream, doesn't greatly out run your consumer, REST service.
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});