I have created a DataFlow pipeline using a BufferBlock
, TransformBlock
and an ActionBlock
. Due to exception in the TransformBlock
, the application is going to deadlock. I'm throttling data using BoundedCapacity
.
My code is like this:
public async Task PerformOperation()
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1 });
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
await bufferBlock.SendAsync(item);
}
}
Here if exception comes in fetchApiResponse
block, the data is not moving and it goes for a deadlock.
How do I handle exception in this pipeline?
Here around 200,000 records are pushed to bufferBlock.
What is the best way to handle the exceptions without causing this deadlock?
UPDATE 1:
Added the FetchData
method also.
Thanks Binil
I couldn't go through the post of @Panagiotis Kanavos. Meanwhile I have updated my code like this to handle the exception based on the comments.
public async Task PerformOperation()
{
try
{
var bufferBlock = new BufferBlock<ObjA>(new DataflowBlockOptions { BoundedCapacity = 1
});
var fetchApiResponse = new TransformBlock<ObjA, ObjA>((item) => {
//Call an api to fetch result.
//Here for some data i get exception
try
{
int apiResult = await apiCall();
}
catch(Exception ex)
{
**var dataflowBlock = (IDataflowBlock)bufferBlock;
dataflowBlock.Fault(ex);
throw ex;**
}
return ObjA;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2, MaxDegreeOfParallelism = 2, CancellationToken = cancellationToken });
var finalBlock = new ActionBlock<ObjA>((item) => {
if (item != null)
{
SaveToDB(item);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1, CancellationToken = cancellationToken });
bufferBlock.LinkTo(fetchApiResponse, new DataflowLinkOptions { PropagateCompletion = true });
fetchApiResponse.LinkTo(finalBlock, new DataflowLinkOptions { PropagateCompletion = true });
await FetchData(bufferBlock);
bufferBlock.Complete();
await Task.WhenAll(fetchApiResponse.Completion, finalBlock.Completion);
}
catch(AggregateException aex)
{ //logging the exceptions in aex }
catch(Exception ex)
{ //logging the exception}
}
public async Task FetchData(bufferBlock)
{
List<ObjA> dataToProcessList = GetFromDB();
foreach (var item in dataToProcessList)
{
if(!await bufferBlock.SendAsync(item))
{
break; //breaking the loop to stop pushing data.
}
}
}
This will now stop the pipeline and doesn't go to a deadlock. Since I'm dealing with lots of data, I'm planning to add a counter for the exceptions and if it exceeds certain limit then only I'll stop the pipeline. If a small network glitch caused one api call to fail, it might work for the next data.
I'll go through the new posts and update my code to make things better. Please provide inputs.
Thanks Binil