I am learning about the TPL Dataflow
Library. So far it's exactly what I was looking for.
I've created a simple class (below) that performs the following functions
ImportPropertiesForBranch
I go to a 3rd party api and get a list of propertiesParallel.For
to SendAsync
the property data into my propertyBufferBlock
propertyBufferBlock
is linked to a propertyXmlBlock
(which itself is a TransformBlock
). propertyXmlBlock
then (asynchronously) goes back to the API (using the api endpoint supplied in the property data) and fetches the property xml for deserialization. TransformBlock
s to persist it to a data store.So my questions are;
await
async calls inside a TransformBlock
or is this a
bottleneck?Parallel.For
, BufferBlock
and async in the TransformBlock
. I'm not sure its the best way and I maybe mixing up some concepts.Any guidance, improvemets and pitfall advice welcomed.
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;
namespace My.ImportService
{
public class ImportService
{
private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;
public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;
ConstructPipeline();
}
private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;
public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);
if (string.IsNullOrEmpty(propertyListXml))
return false;
var properties = _xmlService.DeserializePropertyList(propertyListXml);
if (properties?.property == null || properties.property.Length == 0)
return false;
// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));
propertyBufferBlock.Complete();
await propertyCompleteBlock.Completion;
return true;
}
private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();
propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});
propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}
private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}
private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}
Are there any potential bottlenecks or areas of the code that could be troublesome?
In general your approach looks good and the potential bottle neck is that you are limiting parallel processing of your blocks with MaxDegreeOfParallelism = 1
. Based on the description of the problem each item can be processed independently of others and that's why you can process multiple items at a time.
Is it ok to await async calls inside a
TransformBlock
or is this a bottleneck?
It is perfectly fine because TPL DataFlow supports async operations.
Although the code works , I am worried about the buffering and asyncronsity of the
Parallel.For
,BufferBlock
and async in theTransformBlock
. I'm not sure its the best way and I maybe mixing up some concepts.
One, potential problem in your code that could make you shoot yourself in the foot is calling async method in Parallel.For
and then calling propertyBufferBlock.Complete();
. The problem here is that Parallel.For
does not support async actions and the way you invoke it will call propertyBufferBlock.SendAsync
and move on before returned task is completed. Which means that by the time Parallel.For
exits some operations might still be in running state and items are not yet added to buffer block. And if you will then call propertyBufferBlock.Complete();
those pending items will throw exception and items won't be added to processing. You will get unobserved exception.
You could use ForEachAsync
form this blog post to ensure that all items are added to the block before completing the block. But if you are still limitting processing to 1 operation you can simply add items one at a time. I am not sure how propertyBufferBlock.SendAsync
is implemented, but it can be that in will internally restrict to adding one item at a time so parallel adding would not make any sense.