Search code examples
c#multithreadingasynchronousproducer-consumer.net-4.7.2

How to efficiently process millions of local files using .NET Framework and async I/O?


I need to process one big repository (the source repo) and create/update (or leaves intact) about 2M files in another repo (the metadata repo).

Although there are ~2M of files, most of them are only read to verify their contents does not need to change. In any single run of the processor maybe a few hundreds of files are actually written.

My current implementation does NOT use async I/O, even though I would love to. There is a complication, which prevents me from finding a good solution.

Indeed, my code is the consumer part of a producer/consumer pipeline, where some code produces the content of metadata files (by processing the source files, but the producer piece is not the subject of this question), puts it onto a blocking collection (I call it the bus) and my code consumes from the bus concurrently from multiple threads.

But more importantly, it contains 2 distinct phases:

  1. Phase 1 - While a background task that collects the names of all the existing metadata files is running. While in this phase:
    • The code uses File.Exists to check if a metadata file exists (all are local files).
    • The names of the visited metadata files are recorded in a dedicated collection named processedFiles.
  2. Phase 2 - After the aforementioned background task has completed and the names of all the existing metadata files are collected into the existingFiles collection. While in this phase:
    • The code checks if a metadata file exists by looking up in the existingFiles collection.
    • The names of the visited metadata files are removed from the existingFiles collection.

The challenge is implementing the transition between the two phases without loosing any "packets" on the bus.

I was able to code it relatively easily with Parallel.ForEach, but now I want to use async I/O and wonder what are my options.

I have a big constraint - I must use the .NET Framework. I deplore it, but it is what it is.

I would like to show the essentials of my code, mainly to show how I implement the transition between the phases.

private class EndWorkItemsParallelState
{
    public readonly StringBuilder StringBuilder = new(1000);
    public readonly Guid Guid = Guid.NewGuid();
}

private Task GetEndWorkItemsConsumer(BlockingCollection<EndWorkItem> endWorkItems, int concurrency) => Task.Factory.StartNew(() =>
{
    ConcurrentDictionary<string, object> existingFiles = null;
    ConcurrentDictionary<string, object> processedFiles = new(C.IgnoreCase);

    ConcurrentDictionary<Guid, ManualResetEventSlim> locks = [];
    object modeSwitchGuard = new();

    Func<string, byte[], bool> saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty1(filePath, newContent, processedFiles);

    Parallel.ForEach(endWorkItems.GetConsumingEnumerable(), new ParallelOptions
    {
        MaxDegreeOfParallelism = concurrency
    }, () =>
    {
        EndWorkItemsParallelState state = new();
        if (existingFiles == null)
        {
            locks[state.Guid] = new ManualResetEventSlim();
        }
        return state;
    }, (endWorkItem, loop, state) =>
    {
        ProcessEndWorkItem(endWorkItem, saveJsonFileIfDirty);

        if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
        {
            locks[state.Guid].Set();
            lock (modeSwitchGuard)
            {
                if (existingFiles == null)
                {
                    foreach (var @lock in locks.Values)
                    {
                        @lock.Wait();
                    }

                    existingFiles = m_collectExistingFilesTask.Result;
                    foreach (var processedFile in processedFiles.Keys)
                    {
                        existingFiles.TryRemove(processedFile, out _);
                    }
                    processedFiles = null;
                    
                    saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
                }
            }
        }

        return state;
    }, state =>
    {
        if (locks.TryGetValue(state.Guid, out var @lock))
        {
            @lock.Set();
        }
    });

    locks.Values.ForEach(o => o.Dispose());

    DeleteStaleFiles(existingFiles.Keys);

    void ProcessEndWorkItem(EndWorkItem endWorkItem, Func<string, byte[], bool> saveJsonFileIfDirty){ ... }
    static bool SaveIfDirty1(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles){ ... }
    static bool SaveIfDirty2(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles){ ... }
}, TaskCreationOptions.LongRunning);

The transition is implemented here:

if (existingFiles == null && m_collectExistingFilesTask.IsCompleted)
{
    locks[state.Guid].Set();
    lock (modeSwitchGuard)
    {
        if (existingFiles == null)
        {
            foreach (var @lock in locks.Values)
            {
                @lock.Wait();
            }

            existingFiles = m_collectExistingFilesTask.Result;
            foreach (var processedFile in processedFiles.Keys)
            {
                existingFiles.TryRemove(processedFile, out _);
            }
            processedFiles = null;

            saveJsonFileIfDirty = (filePath, newContent) => SaveIfDirty2(filePath, newContent, existingFiles);
        }
    }
}

When a thread detects that the phase 1 should be over it tries to enter the critical section (only one succeeds) and then waits for ALL the other threads to hit the critical section and block on it. It knows when other threads reach it, because each thread has its own ManualResetEventSlim instance which they signal when they get to the critical section. The thread that is inside it, waits for all these signals. Once all are set, it means all the threads are blocked at the critical section and it is safe to perform the transition between the phases.

Since posting this question I have found a solution, but I wonder if it can be improved.

EDIT 1

Each work item on the bus contains the file path and the new content. The processing checks if the file exists AND the new content is really different (by reading the existing file and diffing). If the content is different OR the file is new - the new content is written out AND another file is created in another repo.

At the end I need to delete all the stale files, i.e. those which were never visited. That is why I collect all the existing files.


Solution

  • Here is my solution:

    ConcurrentDictionary<string, object> existingFiles = null;
    ConcurrentDictionary<string, object> processedFiles = new(C.IgnoreCase);
    
    Func<string, byte[], CancellationToken, Task<bool>> saveJsonFileIfDirtyAsync = (filePath, newContent, ct) => SaveIfDirty1Async(filePath, newContent, processedFiles, ct);
    
    var cancellationTokenSource = new CancellationTokenSource();
    await Task.WhenAll(Enumerable.Range(0, concurrency).Select(async _ =>
    {
        try
        {
            while (!m_collectExistingFilesTask.IsCompleted && await endWorkItems.Reader.WaitToReadAsync(cancellationTokenSource.Token))
            {
                while (!m_collectExistingFilesTask.IsCompleted && endWorkItems.Reader.TryRead(out var endWorkItem))
                {
                    await ProcessEndWorkItemAsync(endWorkItem);
                }
            }
        }
        catch (Exception e) when (e is not OperationCanceledException)
        {
            cancellationTokenSource.Cancel();
            throw;
        }
    }));
    
    existingFiles = m_collectExistingFilesTask.GetAwaiter().GetResult();
    foreach (var processedFile in processedFiles.Keys)
    {
        existingFiles.TryRemove(processedFile, out _);
    }
    processedFiles = null;
    
    saveJsonFileIfDirtyAsync = (filePath, newContent, ct) => SaveIfDirty2Async(filePath, newContent, existingFiles, ct);
    
    await Task.WhenAll(Enumerable.Range(0, concurrency).Select(async _ =>
    {
        try
        {
            do
            {
                while (endWorkItems.Reader.TryRead(out var endWorkItem))
                {
                    await ProcessEndWorkItemAsync(endWorkItem);
                }
            }
            while (await endWorkItems.Reader.WaitToReadAsync(cancellationTokenSource.Token));
        }
        catch (Exception e) when (e is not OperationCanceledException)
        {
            cancellationTokenSource.Cancel();
            throw;
        }
    }));
    
    progressHelper.Done();
    
    async Task ProcessEndWorkItemAsync(EndWorkItem endWorkItem){ ... }
    static Task<bool> SaveIfDirty1Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> processedFiles, CancellationToken ct){ ... }
    static Task<bool> SaveIfDirty2Async(string filePath, byte[] newContent, ConcurrentDictionary<string, object> existingFiles, CancellationToken ct){ ... }
    

    EDIT 1

    Based on the valuable input from @TheodorZoulias I have introduced cancellation token to facilitate failing fast. The local method ProcessEndWorkItemAsync captures the cancellationTokenSource object and the saveJsonFileIfDirtyAsync delegate. It passes cancellationTokenSource.Token to the captured saveJsonFileIfDirtyAsync delegate.

    When any task fails, it cancels the token and it would cancel all the other concurrently running tasks.