I'm struggling with how to apply TPL DataFlow to my application.
I've got a bunch of parallel data operations I want to track and manage, previously I was just using Tasks, but I'm trying to implement DataFlow to give me more control.
I'm composing a pipeline of tasks to say get the data and process it, here's an example of a pipeline to get
data, process
data, and log
it as complete:
TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename =>
{
// read the data file (takes a long time!)
Console.WriteLine("Loading from " + filename);
Thread.Sleep(2000);
// return our result, for now just use the filename
return filename + "_data";
});
TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data =>
{
// process the data
Console.WriteLine("Processiong data " + data);
Thread.Sleep(2000);
// return our result, for now just use the data string
return data + "_processed";
});
TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data =>
{
// Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock)
Console.WriteLine("Result " + data + " complete");
return data;
});
I'm linking them together like this:
// create a pipeline
loadDataFromFile.LinkTo(prodcessData);
prodcessData.LinkTo(logProcessComplete);
I've been trying to follow this tutorial.
My confusion is that in the tutorial this pipeline seems to be a 'fire once' operation. It creates the pipeline and fires it off once, and it completes. This seems counter to how the Dataflow library seems designed, I've read:
The usual way of using TPL Dataflow is to create all the blocks, link them together, and then start putting data in one end.
From "Concurrency in C# Cookbook" by Stephen Cleary.
But I'm not sure how to track
the data after I've put said data 'in one end'. I need to be able to get the processed
data from multiple parts of the program, say the user presses two buttons, one to get the data from "File1" and do something with it, one to get the data from "File2", I'd need something like this I think:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data1: {data}");
}
public async Task loadFile2ButtonPress()
{
loadDataFromFile.Post("File2");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data2: {data}");
}
If these are performed 'synchronously' it works just fine, as there's only one piece of information flowing through the pipeline:
Console.WriteLine("waiting for File 1");
await loadFile1ButtonPress();
Console.WriteLine("waiting for File 2");
await loadFile2ButtonPress();
Console.WriteLine("Done");
Produces the expected output:
waiting for File 1
Loading from File1
Processiong data File1_data
Result File1_data_processed complete
Got data1: File1_data_processed
waiting for File 2
Loading from File2
Processiong data File2_data
Result File2_data_processed complete
Got data2: File2_data_processed
Done
This makes sense to me, it's just doing them one at a time:
However, the point is I want to run these operations in parallel and asynchronously. If I simulate this (say, the user pressing both 'buttons' in quick succession) with:
Console.WriteLine("waiting");
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress());
Console.WriteLine("Done");
Does this work if the second operation takes longer than the first?
I was expecting both to return the first data however (Originally this didn't work but it was a bug I've fixed - it does return the correct items now).
I was thinking I could link an ActionBlock<string>
to perform the action with the data, something like:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
// instead of var data = await logProcessComplete.ReceiveAsync();
logProcessComplete.LinkTo(new ActionBlock<string>(data =>
{
Console.WriteLine($"Got data1: {data}");
}));
}
But this is changing the pipeline completely, now loadFile2ButtonPress
won't work at all as it's using that pipeline.
Can I create multiple pipelines with the same Blocks? Or should I be creating a whole new pipeline (and new blocks) for each 'operation' (that seems to defeat the point of using the Dataflow library at all)
Not sure if this is best place for Stackoverflow or something like Codereview? Might be a bit subjective.
If you need some events to happen after some data has been processed, you should make your last block AsObservable
, and add some small code with Rx.Net
:
var subscription = logProcessComplete.AsObservable();
subscription.Subscribe(i => Console.WriteLine(i));
As been said in comments, you can link your blocks to more than one block, with a predicate. Note, that in that case, message will be delivered only to first matching block. You also may create a BroadcastBlock
, which delivers a copy of the message to each linked block.
Make sure that unwanted by every other block messages are linked to NullTarget
, as in other case they will stay in your pipeline forever, and will stop your completion.
Check that your pipeline correctly handles completion, as in case of multiple links the completion also being propagated only to the first linked block.