I'm submitting a series of select
statements (queries - thousands of them) to a single database synchronously and getting back one DataTable
per query (Note: This program is such that it has knowledge of the DB schema it is scanning only at run time, hence the use of DataTables
). The program runs on a client machine and connects to DBs on a remote machine. It takes a long time to run so many queries. So, assuming that executing them async or in parallel will speed things up, I'm exploring TPL Dataflow (TDF)
. I want to use the TDF
library because it seems to handle all of the concerns related to writing multi-threaded code that would otherwise need to be done by hand.
The code shown is based on http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. Its minimal and is just to help me understand the basic operations of TDF
. Please do know I've read many blogs and coded many iterations trying to crack this nut.
None-the-less, with this current iteration, I have one problem and a question:
Problem
The code is inside a button click
method (Using a UI, a user selects a machine, a sql instance, and a database, and then kicks off the scan). The two lines with the await
operator return an error at build time: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'
. I can't change the return type of the button click method. Do I need to somehow isolate the button click
method from the async-await
code?
Question
Although I've found beau-coup write-ups describing the basics of TDF
, I can't find an example of how to get my hands on the output that each invocation of the TransformBlock
produces (i.e., a DataTable
). Although I want to submit the queries async
, I do need to block until all queries submitted to the TransformBlock
are completed. How do I get my hands on the series of DataTable
s produced by the TransformBlock
and block until all queries are complete?
Note: I acknowledge that I have only one block now. At a minimum, I'll be adding a cancellation block and so do need/want to use TPL.
private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
UserInput userInput = new UserInput
{
MachineName = "gat-admin",
InstanceName = "",
DbName = "AdventureWorks2014",
};
DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);
//CreateTableQueryList gets a list of all tables from the DB and returns a list of
// select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
IList<String> tableQueryList = CreateTableQueryList(userInput);
// Define a block that accepts a select statement and returns a DataTable of results
// where each returned record is: schemaname + tablename + columnname + column datatype + field data
// e.g., if the select query returns one record with 5 columns, then a datatable with 5
// records (one per field) will come back
var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2,
});
// Add items to the block and start processing
foreach (String tableQuery in tableQueryList)
{
await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
}
// Enable the Cancel button and disable the Start button.
toolStripButtonStart.Enabled = false;
toolStripButtonStop.Enabled = true;
//shut down the block (no more inputs or outputs)
transformBlock_SubmitTableQuery.Complete();
//await the completion of the task that procduces the output DataTable
await transformBlock_SubmitTableQuery.Completion;
}
public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
try
{
.
.
await Task.Run(() => sqlDataAdapter.Fill(dt));
// process dt into the output DataTable I need
return outputDt;
}
catch
{
throw;
}
}
The cleanest way to retrieve the output of a TransformBlock
is to perform a nested loop using the methods OutputAvailableAsync
and TryReceive
. It is a bit verbose, so you could consider encapsulating this functionality in an extension method ToListAsync
:
public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
List<T> list = new();
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
list.Add(item);
}
}
Debug.Assert(source.Completion.IsCompleted);
await source.Completion.ConfigureAwait(false); // Propagate possible exception
return list;
}
Then you could use the ToListAsync
method like this:
private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
TransformBlock<string, DataTable> transformBlock = new(async query => //...
//...
transformBlock.Complete();
foreach (DataTable dataTable in await transformBlock.ToListAsync())
{
// Do something with each dataTable
}
}
Note: this ToListAsync
implementation is destructive, meaning that in case of an error the consumed messages are discarded. To make it non-destructive, just remove the await source.Completion
line. In this case you'll have to remember to await
the Completion
of the block after processing the list with the consumed messages, otherwise you won't be aware if the TransformBlock
failed to process all of its input.
Alternative ways to retrieve the output of a dataflow block do exist, for example this one by dcastro uses a BufferBlock
as a buffer and is slightly more performant, but personally I find the approach above to be safer and more straightforward.
Instead of waiting for the completion of the block before retrieving the output, you could also retrieve it in a streaming manner, as an IAsyncEnumerable<T>
sequence:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
Debug.Assert(source.Completion.IsCompleted);
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
This way you will be able to get your hands to each DataTable
immediately after it has been cooked, without having to wait for the processing of all queries. To consume an IAsyncEnumerable<T>
you simply move the await
before the foreach
:
await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
// Do something with each dataTable
}
Advanced: Below is a more sophisticated version of the ToListAsync
method, that propagates all the errors of the underlying block, in the same direct way that are propagated by methods like the Task.WhenAll
and Parallel.ForEachAsync
. The original simple ToListAsync
method wraps the errors in a nested AggregateException
, using the Wait
technique that is shown in this answer.
/// <summary>
/// Asynchronously waits for the successful completion of the specified source, and
/// returns all the received messages. In case the source completes with error,
/// the error is propagated and the received messages are discarded.
/// </summary>
public static Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> source,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
async Task<List<T>> Implementation()
{
List<T> list = new();
while (await source.OutputAvailableAsync(cancellationToken)
.ConfigureAwait(false))
while (source.TryReceive(out T item))
list.Add(item);
await source.Completion.ConfigureAwait(false);
return list;
}
return Implementation().ContinueWith(t =>
{
if (t.IsCanceled) return t;
Debug.Assert(source.Completion.IsCompleted);
if (source.Completion.IsFaulted)
{
TaskCompletionSource<List<T>> tcs = new();
tcs.SetException(source.Completion.Exception.InnerExceptions);
return tcs.Task;
}
return t;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
.NET 6 update: A new API DataflowBlock.ReceiveAllAsync
was introduced in .NET 6, with this signature:
public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
this IReceivableSourceBlock<TOutput> source,
CancellationToken cancellationToken = default);
It is similar with the aforementioned ToAsyncEnumerable
method. The important difference is that the new API does not propagate the possible exception of the consumed source
block, after propagating all of its messages. This behavior is not consistent with the analogous API ReadAllAsync
from the Channels library. I have reported this consistency on GitHub, and the issue is currently labeled by Microsoft as a bug.