I am reading azure tables' data - around 5k tables and collects different metrics and save them back to some other azure tables, everything in a asynchronous way. The problem I am facing is when there are huge data which can happen occasionally, application starts hanging. The same code is working fine with less data. The steps I am doing (all of them are asynchronous using Rx, async and await) are
Task.WhenAll
)Task.WhenAll
)what I want is, use asynchronousy till it doesn't make my application hanging. If there are more data than what can be handled, it should not read any more table's data instead focus on completing the available data processing.
Does Parallel.ForEach
takes care of that?
The code: edited as per Stephen Cleary, Still not working for all the tables. whereas it is working for 500 tables,
I think it is the amount of data that brings the app (console app) to a standstill rather than the number of threads. (One thread may end up retrieving million rows, in thousands and each thousand is passed to a method and its count is added to dictionary hence can be garbage collected when there is a need for more memory) or is it the way I have implemented Semaphoreslim
that is wrong?
public async Task CalculateMetricsForAllTablesAsync()
{
var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();
await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);
var allWizardTableNames = allWizardTableNamesTask.Result;
var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;
var throttler = new SemaphoreSlim(10);
var concurrentTableProcessingTasks = new ConcurrentStack<Task>();
foreach (var tname in allWizardTableNames)
{
await throttler.WaitAsync();
try
{
concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);
}
private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
var tableDataRetrieved = new TaskCompletionSource<bool>();
var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();
_fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
.Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities), () => tableDataRetrieved.TrySetResult(true));
await tableDataRetrieved.Task;
await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}
Since your async
is wrapping Rx, I'd recommend throttling at the async
level. You can do this by defining a SemaphoreSlim
and wrapping your method logic within a WaitAsync
/Release
.
Alternatively, consider TPL Dataflow. Dataflow has built-in options for throttling (MaxDegreeOfParallelism
), and also interoperates naturally with async
and Rx.