Search code examples
c#asp.net-coretask-parallel-libraryparallel.foreachasp.net-core-5.0

Parallel.ForEach MaxDegreeOfParallelism Strange Behavior with Increasing "Chunking"


I'm not sure if the title makes sense, it was the best I could come up with, so here's my scenario.

I have an ASP.NET Core app that I'm using more as a shell and for DI configuration. In Startup it adds a bunch of IHostedServices as singletons, along with their dependencies, also as singletons, with minor exceptions for SqlConnection and DbContext which we'll get to later. The hosted services are groups of similar services that:

  1. Listen for incoming reports from GPS devices and put into a listening buffer.
  2. Parse items out of the listening buffer and put into a parsed buffer.

Eventually there's a single service that reads the parsed buffer and actually processes the parsed reports. It does this by passing the report it took out of the buffer to a handler and awaits for it to complete to move to the next. This has worked well for the past year, but it appears we're running into a scalability issue now because its processing one report at a time and the average time to process is 62ms on the server which includes the Dapper trip to the database to get the data needed and the EF Core trip to save changes.

If however the handler decides that a report's information requires triggering background jobs, then I suspect it takes 100ms or more to complete. Over time, the buffer fills up faster than the handler can process to the point of holding 10s if not 100s of thousands of reports until they can be processed. This is an issue because notifications are delayed and because it has the potential for data loss if the buffer is still full by the time the server restarts at midnight.

All that being said, I'm trying to figure out how to make the processing parallel. After lots of experimentation yesterday, I settled on using Parallel.ForEach over the buffer using GetConsumingEnumerable(). This works well, except for a weird behavior I don't know what to do about or even call. As the buffer is filled and the ForEach is iterating over it it will begin to "chunk" the processing into ever increasing multiples of two. The size of the chunking is affected by the MaxDegreeOfParallelism setting. For example (N# = Next # of reports in buffer):

MDP = 1

  • N3 = 1 at a time
  • N6 = 2 at a time
  • N12 = 4 at a time
  • ...

MDP = 2

  • N6 = 1 at a time
  • N12 = 2 at a time
  • N24 = 4 at a time
  • ...

MDP = 4

  • N12 = 1 at a time
  • N24 = 2 at a time
  • N48 = 4 at a time
  • ...

MDP = 8 (my CPU core count)

  • N24 = 1 at a time
  • N48 = 2 at a time
  • N96 = 4 at a time
  • ...

This is arguably worse than the serial execution I have now because by the end of the day it will buffer and wait for, say, half a million reports before actually processing them.

Is there a way to fix this? I'm not very experienced with Parallel.ForEach so from my point of view this is strange behavior. Ultimately I'm looking for a way to parallel process the reports as soon as they are in the buffer, so if there's other ways to accomplish this I'm all ears. This is roughly what I have for the code. The handler that processes the reports does use IServiceProvider to create a scope and get an instance of SqlConnection and DbContext. Thanks in advance for any suggestions!

public sealed class GpsReportService :
    IHostedService {
    private readonly GpsReportBuffer _buffer;
    private readonly Config _config;
    private readonly GpsReportHandler _handler;
    private readonly ILogger _logger;

    public GpsReportService(
        GpsReportBuffer buffer,
        Config config,
        GpsReportHandler handler,
        ILogger<GpsReportService> logger) {
        _buffer = buffer;
        _config = config;
        _handler = handler;
        _logger = logger;
    }

    public Task StartAsync(
        CancellationToken cancellationToken) {
        _logger.LogInformation("GPS Report Service => starting");

        Task.Run(Process, cancellationToken).ConfigureAwait(false);//   Is ConfigureAwait here correct usage?

        _logger.LogInformation("GPS Report Service => started");

        return Task.CompletedTask;
    }

    public Task StopAsync(
        CancellationToken cancellationToken) {
        _logger.LogInformation("GPS Parsing Service => stopping");

        _buffer.CompleteAdding();

        _logger.LogInformation("GPS Parsing Service => stopped");

        return Task.CompletedTask;
    }

    //  ========================================================================
    //  Utilities
    //  ========================================================================

    private void Process() {
        var options = new ParallelOptions {
            MaxDegreeOfParallelism = 8,
            CancellationToken = CancellationToken.None
        };

        Parallel.ForEach(_buffer.GetConsumingEnumerable(), options, async report => {
            try {
                await _handler.ProcessAsync(report).ConfigureAwait(false);
            } catch (Exception e) {
                if (_config.IsDevelopment) {
                    throw;
                }

                _logger.LogError(e, "GPS Report Service");
            }
        });
    }

    private async Task ProcessAsync() {
        while (!_buffer.IsCompleted) {
            try {
                var took = _buffer.TryTake(out var report, 10);

                if (!took) {
                    continue;
                }

                await _handler.ProcessAsync(report!).ConfigureAwait(false);
            } catch (Exception e) {
                if (_config.IsDevelopment) {
                    throw;
                }

                _logger.LogError(e, "GPS Report Service");
            }
        }
    }
}

public sealed class GpsReportBuffer :
    BlockingCollection<GpsReport> {
}

Solution

  • You can't use Parallel methods with async delegates - at least, not yet.

    Since you already have a "pipeline" style of architecture, I recommend looking into TPL Dataflow. A single ActionBlock may be all that you need, and once you have that working, other blocks in TPL Dataflow may replace other parts of your pipeline.

    If you prefer to stick with your existing buffer, then you should use asynchronous concurrency instead of Parallel:

    private void Process() {
      var throttler = new SemaphoreSlim(8);
      var tasks = _buffer.GetConsumingEnumerable()
          .Select(async report =>
          {
            await throttler.WaitAsync();
            try {
              await _handler.ProcessAsync(report).ConfigureAwait(false);
            } catch (Exception e) {
              if (_config.IsDevelopment) {
                throw;
              }
    
              _logger.LogError(e, "GPS Report Service");
            }
            finally {
              throttler.Release();
            }
          })
          .ToList();
      await Task.WhenAll(tasks);
    }