I need to process files which are sent to us by merchants. Each file name will contain the merchant's id. E.g. Products - 118763.csv, where the number is the merchant id.
I want to set up a dataflow to process these files. Multiple files can be processed simultaneously.
However, files for each individual merchant must be processed in order. That is to say, that no two files for a particular merchant can be processed simultaneously.
I'm not sure how to model this with dataflow. I could monitor which merchants are currently being processed, but I don't know how to make sure I don't process two files from the same merchant, while still processing as many files at once as I can.
It seems that I need to filter what I take out of the queue, but I can only get one item at a time, and once I have, what then. I could requeue it, but that could mean a lot of requeueing if there are only two files, and they are both from the same merchant.
Any ideas?
You can create several ActionBlock
s (as much as you want to process in parallel) where each one processes only a single file at a time, and use number in the file name to choose the right ActionBlock
. This will allow parallel processing while insuring files with the same merchant id are processed sequentially.
Initialization:
_actionBlocks = new ActionBlock<File>[DegreeOfParallelism];
for (var i = 0; i < _actionBlocks.Length; i++)
{
_actionBlocks[i] = new ActionBlock<File>(file => ProcessFile(file));
}
Usage:
void ProcessFile(string path)
{
_actionBlocks[(uint) int.Parse(Path.GetFileNameWithoutExtension(path))%_actionBlocks.Length].Post(
ReadFile(path));
}