We have a data processing pipeline that we're trying to use the TPL Dataflow
framework for.
Basic gist of the pipeline:
Now we have an existing Python file that does all the above in a very slow & painful way - the code is a mess.
My thinking was the following looking at TPL Dataflow
.
BufferBlock<string>
to Post all the files intoTransformBlock<string, SensorDataDto>
predicate to detect whether to import this fileTransformBlock<string, SensorDataDto>
reads through the CSV file and creates SensorDataDto
structureBatchBlock<SensorDataDto>
is used within the TransformBlock
delegate to batch up 100 requests.
4.5. ActionBlock<SensorDataDto>
to push the 100 records into the Database.
ActionBlock
to mark the CSV as imported.I've created the first few operations and they're working (BufferBlock
-> TransformBlock
+ Predicate
&& Process if hasn't) but I'm unsure how to continue to the flow so that I can post 100 to the BatchBlock
within the TransformBlock
and wire up the following actions.
Does this look right - basic gist, and how do I tackle the BufferBlock
bits in a TPL data flowy way?
bufferBlock.LinkTo(readCsvFile, ShouldImportFile)
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>())
readCsvFile.LinkTo(normaliseData)
normaliseData.LinkTo(updateCsvImport)
updateCsvImport.LinkTo(completionBlock)
batchBlock.LinkTo(insertSensorDataBlock)
bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete());
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete());
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete());
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete());
batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete());
Inside the normaliseData
method I'm calling BatchBlock.Post<..>(...)
, is that a good pattern or should it be structured differently? My problem is that I can only mark the file as being imported after all the records have been pushed through.
Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait();
If we have a batch of 100
, what if 80
are pushed in, is there a way to drain the last 80
?
I wasn't sure if I should Link the BatchBlock
in the main pipeline, I do wait till both are finished.
First of all, you don't need to use the Completion
in that matter, you can use the PropagateCompletion
property during link:
// with predicate
bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile);
// without predicate
readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true });
Now, back to your problem with batches. Maybe, you can use a JoinBlock<T1, T2>
or BatchedJoinBlock<T1, T2>
here, by attaching them into your pipeline and gathering the results of joins, so you got full picture of work being done. Maybe you can implement your own ITargetBlock<TInput>
so you can consume the messages in your way.
According official docs, the blocks are greedy, and gather data from linked one as soon as it becomes available, so join blocks may stuck, if one target is ready and other is not, or batch block has 80%
of batch size, so you need to put that in your mind. In case of your own implementation you can use ITargetBlock<TInput>.OfferMessage
method to get information from your sources.
BatchBlock<T>
is capable of executing in both greedy and non-greedy modes. In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.In non-greedy mode, all messages are postponed from sources until enough sources have offered messages to the block to create a batch. Thus, a
BatchBlock<T>
can be used to receive1
element from each ofN
sources,N
elements from1
source, and a myriad of options in between.