Search code examples
c#multithreadingasync-awaittask

Multi-threading in a foreach loop


I have read a few stackoverflow threads about multi-threading in a foreach loop, but I am not sure I am understanding and using it right.
I have tried multiple scenarios, but I am not seeing much increase in performance.

Here is what I believe runs Asynchronous tasks, but running synchronously in the loop using a single thread:

Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }               
}    
stopWatch.Stop();

Here is what I hoped to be running Asynchronously (still using a single thread) - I would have expected some speed improvement already here:

List<Task<ExchangeTicker>> exchTkrs = new List<Task<ExchangeTicker>>();
stopWatch.Start();

foreach (IExchangeAPI selectedApi in selectedApis)
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        exchTkrs.Add(selectedApi.GetTickerAsync(symbol));
    }
}

ExchangeTicker[] retTickers = await Task.WhenAll(exchTkrs);
stopWatch.Stop();

Here is what I would have hoped to run Asynchronously in Multi-thread:

stopWatch.Start();

Parallel.ForEach(selectedApis, async (IExchangeAPI selectedApi) =>
{
    if (exchangeSymbols.TryGetValue(selectedApi.Name, out symbol))
    {
        ticker = await selectedApi.GetTickerAsync(symbol);
    }
});
stopWatch.Stop();

Stop watch results interpreted as follows:

Console.WriteLine("Time elapsed (ns): {0}", stopWatch.Elapsed.TotalMilliseconds * 1000000);

Console outputs:

Time elapsed (ns): 4183308100
Time elapsed (ns): 4183946299.9999995
Time elapsed (ns): 4188032599.9999995

Now, the speed improvement looks minuscule. Am I doing something wrong or is that more or less what I should be expecting? I suppose writing to files would be a better to check that.
Would you mind also confirming I am interpreting the different use cases correctly?

Finally, using a foreach loop in order to get the ticker from multiple platforms in parallel may not be the best approach. Suggestions on how to improve this would be welcome.

EDIT

Note that I am using the ExchangeSharp code base that you can find here

Here is what the GerTickerAsync() method looks like:

public virtual async Task<ExchangeTicker> GetTickerAsync(string marketSymbol)
{
    marketSymbol = NormalizeMarketSymbol(marketSymbol);
    return await Cache.CacheMethod(MethodCachePolicy, async () => await OnGetTickerAsync(marketSymbol), nameof(GetTickerAsync), nameof(marketSymbol), marketSymbol);
}

For the Kraken API, you then have:

protected override async Task<ExchangeTicker> OnGetTickerAsync(string marketSymbol)
{
    JToken apiTickers = await MakeJsonRequestAsync<JToken>("/0/public/Ticker", null, new Dictionary<string, object> { { "pair", NormalizeMarketSymbol(marketSymbol) } });
    JToken ticker = apiTickers[marketSymbol];
    return await ConvertToExchangeTickerAsync(marketSymbol, ticker);
}

And the Caching method:

public static async Task<T> CacheMethod<T>(this ICache cache, Dictionary<string, TimeSpan> methodCachePolicy, Func<Task<T>> method, params object?[] arguments) where T : class
{
    await new SynchronizationContextRemover();
    methodCachePolicy.ThrowIfNull(nameof(methodCachePolicy));
    if (arguments.Length % 2 == 0)
    {
        throw new ArgumentException("Must pass function name and then name and value of each argument");
    }
    string methodName = (arguments[0] ?? string.Empty).ToStringInvariant();
    string cacheKey = methodName;
    for (int i = 1; i < arguments.Length;)
    {
        cacheKey += "|" + (arguments[i++] ?? string.Empty).ToStringInvariant() + "=" + (arguments[i++] ?? string.Empty).ToStringInvariant("(null)");
    }
    if (methodCachePolicy.TryGetValue(methodName, out TimeSpan cacheTime))
    {
        return (await cache.Get<T>(cacheKey, async () =>
        {
            T innerResult = await method();
            return new CachedItem<T>(innerResult, CryptoUtility.UtcNow.Add(cacheTime));
        })).Value;
    }
    else
    {
        return await method();
    }
}

Solution

  • At first it should be pointed out that what you are trying to achieve is performance, not asynchrony. And you are trying to achieve it by running multiple operations concurrently, not in parallel. To keep the explanation simple I'll use a simplified version of your code, and I'll assume that each operation is a direct web request, without an intermediate caching layer, and with no dependencies to values existing in dictionaries.

    foreach (var symbol in selectedSymbols)
    {
        var ticker = await selectedApi.GetTickerAsync(symbol);
    }
    

    The above code runs the operations sequentially. Each operation starts after the completion of the previous one.

    var tasks = new List<Task<ExchangeTicker>>();
    foreach (var symbol in selectedSymbols)
    {
        tasks.Add(selectedApi.GetTickerAsync(symbol));
    }
    var tickers = await Task.WhenAll(tasks);
    

    The above code runs the operations concurrently. All operations start at once. The total duration is expected to be the duration of the longest running operation.

    Parallel.ForEach(selectedSymbols, async symbol =>
    {
        var ticker = await selectedApi.GetTickerAsync(symbol);
    });
    

    The above code runs the operations concurrently, like the previous version with Task.WhenAll. It offers no advantage, while having the huge disadvantage that you no longer have a way to await the operations to complete. The Parallel.ForEach method will return immediately after launching the operations, because the Parallel class doesn't understand async delegates (it does not accept Func<Task> lambdas). Essentially there are a bunch of async void lambdas in there, that are running out of control, and in case of an exception they will bring down the process.

    So the correct way to run the operations concurrently is the second way, using a list of tasks and the Task.WhenAll. Since you've already measured this method and haven't observed any performance improvements, I am assuming that there something else that serializes the concurrent operations. It could be something like a SemaphoreSlim hidden somewhere in your code, or some mechanism on the server side that throttles your requests. You'll have to investigate further to find where and why the throttling happens.

    .NET 6 update: A new API Parallel.ForEachAsync is available, and more suitable for solving this problem.