Search code examples
c#multithreadingparallel.foreach

C# Parallel.Foreach...Multi theading with unknown number of threads


I have a sync process that must run on each of my businesses. The number of businesses is ever-changing.

I've read docs about the Thread class, Parallelism..etc...I'm not sure I understand how to do it without knowning/naming a predefined number of threads...in this case, that number is unknown. For that reason, I found Parallel.ForEach...because I wish to run an unknown number of simultaneous operations

My sync operations run every 10 minutes. They each take up to a minute or two to run. Obviously, I can't run them iteratively because by the time they'd finish, the next call would be triggering.

I want to run them simultaneously in separate threads. While they should each have unique API keys, they do not share mem or data and will not modify any shared data.

For this, I've done some research on how to do multi-threading...I'm thinking Parallel.ForEach would do the trick...

I need help with the syntax...

This is in a Worker Service...I have a private method called SyncBusiness(int businessId) that calls an API endpoint that syncs the business. Easy..just need help with calling the method?

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var businessIds = (from x in _db.Poslookup
                       select x.BusinessId).Distinct();

    while (!stoppingToken.IsCancellationRequested)
    {
        // Want to multi-thread a sync for each of the businesses in businessIds
        Parallel.ForEach(businessIds, i => { 
            await SyncBusiness(i)
        });

        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
    }
}

Also, please comment on any gotchas regarding scalability, thread limits...etc....any areas where I might get into trouble if I grew to several thousand businesses to sync...perhaps suggestions on things to read about sync operations and scalability?

Thank you so much. Cheers.


Solution

  • As others have noted, you can't use async with Parallel.ForEach. You can, however, make asynchronous code concurrent by starting all SyncBusiness calls at once and then using Task.WhenAll:

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
      var businessIds = (from x in _db.Poslookup
                         select x.BusinessId).Distinct();
    
      while (!stoppingToken.IsCancellationRequested)
      {
        var tasks = businessIds.Select(SyncBusiness).ToList();
        await Task.WhenAll(tasks);
    
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
      }
    }
    

    I'd also recommend making your database lookup asynchronous:

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
      while (!stoppingToken.IsCancellationRequested)
      {
        var businessIds = await (from x in _db.Poslookup
                           select x.BusinessId).Distinct().ToListAsync();
    
        var tasks = businessIds.Select(SyncBusiness).ToList();
        await Task.WhenAll(tasks);
    
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        await Task.Delay(600000, stoppingToken);
      }
    }
    

    And the final observation is that this code current syncs all the businesses and then waits for ten minutes in between its work. If you want it to start running every 10 minutes, then you can start the timer at the beginning of the method:

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
      while (!stoppingToken.IsCancellationRequested)
      {
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        var timerTask = Task.Delay(TimeSpan.FromMinutes(10), stoppingToken);
        var businessIds = await (from x in _db.Poslookup
                           select x.BusinessId).Distinct().ToListAsync();
    
        var tasks = businessIds.Select(SyncBusiness).ToList();
        tasks.Add(timerTask);
        await Task.WhenAll(tasks);
      }
    }