Search code examples
c#tpl-dataflow

Blocked on block design for data flow


We have a data processing pipeline that we're trying to use the TPL Dataflow framework for.

Basic gist of the pipeline:

  1. Iterate through CSV files on the filesystem (10,000)
  2. Verify we haven't imported contents, if we have ignore
  3. Iterate through contents of a single CSV file (20,000-120,000 rows) and create a datastructure that fits to our needs.
  4. Batch up 100 of these new dataStructured items and push them into a Database
  5. Mark the CSV file as being imported.

Now we have an existing Python file that does all the above in a very slow & painful way - the code is a mess.

My thinking was the following looking at TPL Dataflow.

  1. BufferBlock<string> to Post all the files into
  2. TransformBlock<string, SensorDataDto> predicate to detect whether to import this file
  3. TransformBlock<string, SensorDataDto> reads through the CSV file and creates SensorDataDto structure
  4. BatchBlock<SensorDataDto> is used within the TransformBlock delegate to batch up 100 requests.

    4.5. ActionBlock<SensorDataDto> to push the 100 records into the Database.

  5. ActionBlock to mark the CSV as imported.

I've created the first few operations and they're working (BufferBlock -> TransformBlock + Predicate && Process if hasn't) but I'm unsure how to continue to the flow so that I can post 100 to the BatchBlock within the TransformBlock and wire up the following actions.

Does this look right - basic gist, and how do I tackle the BufferBlock bits in a TPL data flowy way?

bufferBlock.LinkTo(readCsvFile, ShouldImportFile)
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>())
readCsvFile.LinkTo(normaliseData)
normaliseData.LinkTo(updateCsvImport)
updateCsvImport.LinkTo(completionBlock)

batchBlock.LinkTo(insertSensorDataBlock)

bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete());
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete());
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete());
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete());

batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete());

Inside the normaliseData method I'm calling BatchBlock.Post<..>(...), is that a good pattern or should it be structured differently? My problem is that I can only mark the file as being imported after all the records have been pushed through.

Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait();

If we have a batch of 100, what if 80 are pushed in, is there a way to drain the last 80?

I wasn't sure if I should Link the BatchBlock in the main pipeline, I do wait till both are finished.


Solution

  • First of all, you don't need to use the Completion in that matter, you can use the PropagateCompletion property during link:

    // with predicate
    bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile);
    // without predicate
    readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true });
    

    Now, back to your problem with batches. Maybe, you can use a JoinBlock<T1, T2> or BatchedJoinBlock<T1, T2> here, by attaching them into your pipeline and gathering the results of joins, so you got full picture of work being done. Maybe you can implement your own ITargetBlock<TInput> so you can consume the messages in your way.

    According official docs, the blocks are greedy, and gather data from linked one as soon as it becomes available, so join blocks may stuck, if one target is ready and other is not, or batch block has 80% of batch size, so you need to put that in your mind. In case of your own implementation you can use ITargetBlock<TInput>.OfferMessage method to get information from your sources.

    BatchBlock<T> is capable of executing in both greedy and non-greedy modes. In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.

    In non-greedy mode, all messages are postponed from sources until enough sources have offered messages to the block to create a batch. Thus, a BatchBlock<T> can be used to receive 1 element from each of N sources, N elements from 1 source, and a myriad of options in between.