Search code examples
c#asynchronousasync-awaitsystem.reactiveparallel.foreach

asynchronous only when there are less data


I am reading azure tables' data - around 5k tables and collects different metrics and save them back to some other azure tables, everything in a asynchronous way. The problem I am facing is when there are huge data which can happen occasionally, application starts hanging. The same code is working fine with less data. The steps I am doing (all of them are asynchronous using Rx, async and await) are

  1. Read all the table names from Azure
  2. Read all the tables previous metric data (1 & 2 are in parallel - Task.WhenAll)
  3. Get data from each table, process and save (Task.WhenAll)

what I want is, use asynchronousy till it doesn't make my application hanging. If there are more data than what can be handled, it should not read any more table's data instead focus on completing the available data processing.

Does Parallel.ForEach takes care of that?

The code: edited as per Stephen Cleary, Still not working for all the tables. whereas it is working for 500 tables,

I think it is the amount of data that brings the app (console app) to a standstill rather than the number of threads. (One thread may end up retrieving million rows, in thousands and each thousand is passed to a method and its count is added to dictionary hence can be garbage collected when there is a need for more memory) or is it the way I have implemented Semaphoreslim that is wrong?

public async Task CalculateMetricsForAllTablesAsync()
{
    var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
    var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();

    await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);

    var allWizardTableNames = allWizardTableNamesTask.Result;
    var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;

    var throttler = new SemaphoreSlim(10);
    var concurrentTableProcessingTasks = new ConcurrentStack<Task>();

    foreach (var tname in allWizardTableNames)
    {
        await throttler.WaitAsync();
        try
        {
           concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
        }
        finally
        {
            throttler.Release();
         }
     }

     await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);

}

private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
    var tableDataRetrieved = new TaskCompletionSource<bool>();
    var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();

    _fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
        .Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities), () => tableDataRetrieved.TrySetResult(true));

    await tableDataRetrieved.Task;
    await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}

Solution

  • Since your async is wrapping Rx, I'd recommend throttling at the async level. You can do this by defining a SemaphoreSlim and wrapping your method logic within a WaitAsync/Release.

    Alternatively, consider TPL Dataflow. Dataflow has built-in options for throttling (MaxDegreeOfParallelism), and also interoperates naturally with async and Rx.