Search code examples
c#.net-6.0producer-consumerconcurrentdictionarysystem.threading.channels

Avoid consumers processing the same entry


Cant prevent multiple consumer from processing the same record in queue

using System.Threading.Channels library, when writer enqueues some model, one of multiple consumers starts to process it. during this, writer goes to the database and reads the same record(because it is still being processed and status is not updated) which results in another consumer starting to process the same model. I have implemented ConcurrentDictionary in order to prevent the problem, but it can not help. Any ideas on how to solve this problem?

Here is the code:

public sealed class SendImagesBackgroundService : BackgroundTask
{
    private readonly ILogger<SendImagesBackgroundService> _logger;
    private readonly IServiceProvider _serviceProvider;
    private readonly IApiService _apiService;
    private readonly Channel<InspectionFileModel> _channel;
    private readonly SendImagesBackgroundServiceOptions _options;
    private static readonly ConcurrentDictionary<string, bool>
        s_concurrentDictionary = new();

    public SendImagesBackgroundService(
        ILogger<SendImagesBackgroundService> logger,
        IServiceProvider serviceProvider,
        IApiService apiService,
        IOptions<SendImagesBackgroundServiceOptions> options,
        IBackgroundTaskLockProvider lockProvider) : base(logger)
    {
        UseExclusiveLock(lockProvider);

        _options = options.Value;
        _logger = logger;
        _serviceProvider = serviceProvider;
        _apiService = apiService;

        _channel = Channel.CreateBounded<InspectionFileModel>(
            new BoundedChannelOptions(_options.ChannelQueueSize));
    }

    protected override Task ExecuteAsync(CancellationToken cancellationToken)
    {
        Task.Run(() => FetchImages(cancellationToken), cancellationToken);
        Task.Run(() => StartProcessingInspectionImages(cancellationToken),
            cancellationToken);
        return Task.CompletedTask;
    }

    private async Task FetchImages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            using var serviceScope = _serviceProvider.CreateScope();
            List<InspectionFileModel>? inspectionImages = default;
            do
            {
                try
                {
                     var mediatr = serviceScope.ServiceProvider
                         .GetRequiredService<IMediator>();
                     inspectionImages = await mediatr.Send(
                         new DownloadInspectionFilesQueryNew(),cancellationToken);
                     if (inspectionImages is not {Count: > 0})
                     {
                         continue;
                     }

                     var nonProcessingImages = inspectionImages.Where(
                         x => !s_concurrentDictionary.ContainsKey(x.Id));

                     foreach (var image in nonProcessingImages)
                     {
                         s_concurrentDictionary.TryAdd(image.Id, true);
                         while (!(cancellationToken.IsCancellationRequested &&
                             await _channel.Writer.WaitToWriteAsync(
                                 cancellationToken)))
                         {
                             if (!_channel.Writer.TryWrite(image))
                             {
                                 continue;
                             }

                             break;
                         }
                     }
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                    _logger.LogError(ex, "Fetching inspection images Failed");
                }

            } while (!cancellationToken.IsCancellationRequested
                && inspectionImages  is {Count: > 0});

            await Task.Delay( _options.DelayBetweenFetchBatchMs, cancellationToken);
        }

        _logger.LogInformation("End fetching inspection images");
    }

    private async Task StartProcessingInspectionImages(
        CancellationToken cancellationToken)
    {
        var parallelProcesses = new List<Task>();
        for (int i = 0; i <   _options.NumberOfParallelTasks; i++)
        {
            var task = Task.Run(() => ProcessInspectionImages(cancellationToken),
                cancellationToken);
            parallelProcesses.Add(task);
        }
        await Task.WhenAll(parallelProcesses);
    }

    private async Task ProcessInspectionImages(CancellationToken cancellationToken)
    {
        while (!(cancellationToken.IsCancellationRequested &&
                 await _channel.Reader.WaitToReadAsync(cancellationToken)))
        {
            while (!cancellationToken.IsCancellationRequested &&
                     _channel.Reader.TryRead(out var inspectionImage ))
            {
                try
                {
                    await SendInspectionImageToLivo(inspectionImage,
                        cancellationToken);
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                   //handle
                }
            }
        }
    }

    private async Task SendInspectionImageToLivo(InspectionFileModel image,
        CancellationToken cancellationToken)
    {
        try
        {
            //send data over the network
        }
        catch (ApiException ex)
        {
            //handle 
        }
        finally
        {
            s_concurrentDictionary.TryRemove(image.Id, out bool _);
        }
    }

    public override object? GetTelemetry() => null;
}

The status is updated in the SendInspectionImageToLivo method. If status code is 200, status field becomes successful. In case of 4** it's set to failed, and subsequent DB queries won't include them in the result, based on those statuses.


Solution

  • I think that the key for solving your problem is to take a snapshot of the currently processed images, before fetching the inspectionImages via the DownloadInspectionFilesQueryNew command. Then filter the images based on the snapshot, and not based on the current state of the dictionary. Otherwise it is possible to start fetching the images with a X image stored in the dictionary, while the images are fetched the X image is processed and is removed from the dictionary, the X image is among the fetched images, and eventually the X image is not filtered and it is processed again. Using the snapshot eliminates this possibility.

    Below is an alternative way to implement your service, that you might find interesting. It is based on the Parallel.ForEachAsync overload that takes an IAsyncEnumerable<T> as source. The source is an iterator method that fetches unprocessed images and yields them. I think that it is simpler and more readable than the Channel<T>-based implementation.

    protected override Task ExecuteAsync(CancellationToken cancellationToken)
    {
        ParallelOptions parallelOptions = new()
        {
            MaxDegreeOfParallelism = _options.NumberOfParallelTasks,
            CancellationToken = cancellationToken,
        };
    
        HashSet<string> processing = new();
    
        async IAsyncEnumerable<InspectionFileModel> Producer(
            [EnumeratorCancellation] CancellationToken ct = default)
        {
            while (true)
            {
                HashSet<string> processingSnapshot;
                lock (processing) processingSnapshot = new(processing);
                var inspectionImages = await mediatr.Send(
                    new DownloadInspectionFilesQueryNew(), ct);
                var nonProcessingImages = inspectionImages
                    .Where(x => !processingSnapshot.Contains(x.Id));
                int yieldedCount = 0;
                foreach (InspectionFileModel image in nonProcessingImages)
                {
                    lock (processing) processing.Add(image.Id);
                    yield return image; yieldedCount++;
                }
                if (yieldedCount == 0)
                    await Task.Delay(1000, ct); // Take a small break.
            }
        }
    
        return Parallel.ForEachAsync(Producer(), parallelOptions, async (image, ct) =>
        {
            try
            {
                await SendInspectionImageToLivo(image, ct);
            }
            finally { lock (processing) processing.Remove(image.Id); }
        });
    }
    

    You might find that it has better behavior in case of unhandled exceptions too.