I have a simple producer-consumer class using a BufferBlock
object to post messages adapted from TPL Dataflow Docs and my problem is that using it in a WPF app causes that the consumer receives data in the same main-thread while the console application executes it in a worker thread. Here is my code:
public class DataflowProducerConsumer
{
BufferBlock<int> _buffer;
public DataflowProducerConsumer()
{
_buffer = new BufferBlock<int>();
var consumer = UseBufferBlock(_buffer);
}
public void Produce(int number)
{
_buffer.Post(number);
var actionBlock = UseActionBlock();
actionBlock.Post(number);
}
public async Task UseBufferBlock(ISourceBlock<int> source)
{
while (await source.OutputAvailableAsync())
{
int data = source.Receive();
// ...process data asyncron
Console.WriteLine("BufferBlock-Thread {0} received: {1}", Thread.CurrentThread.ManagedThreadId, data);
}
}
public ITargetBlock<int> UseActionBlock()
{
return new ActionBlock<int>(data =>
{
Console.WriteLine(string.Format("ActionBlock-Thread: {0} received: {1}", Thread.CurrentThread.ManagedThreadId, data));
});
}
}
Test executed in main or on Click-event:
var obj = new DataflowProducerConsumer();
obj.Produce(19);
Well, I changed Post
to SendAsync
without success. And the ActionBlock
part works well. What's the difference? What am I doing wrong?
That's not related to TPL DataFlow itself, but rather to the following statement:
await source.OutputAvailableAsync()
await will capture current SynchronizationContext
and post continuation (the rest of the method after await) to it. If there is no synchronization context (like in console app) - it will post continuation to thread pool thread.
WPF has synchronization context, and posting to it is posting to UI thread (like Dispatcher.BeginInvoke()
), so that's what you observe.
To prevent capturing current synchronization context (and so - to always post continuation to thread pool thread), use ConfigureAwait(false)
:
while (await source.OutputAvailableAsync().ConfigureAwait(false))
{
int data = source.Receive();
// ...process data asyncron
Console.WriteLine("BufferBlock-Thread {0} received: {1}", Thread.CurrentThread.ManagedThreadId, data);
}
Also note that there is ReceiveAsync()
method, which you can use (and await) instead of synchronous Receive()
, though that is not related to the problem in question.