Search code examples
c#asynchronoustaskyieldconcurrently

Batch from yield don't start up concurrently


I'm currently trying to implement a file searcher that yields found files in batches so it can get processed concurrently. But my problem arrives when it yields and I pass it to a task. It never starts multiple tasks up, but waits for the previous task is done before starting next up, where I want it to start as many tasks up as my SemaphoreSlim is allowing. I'm new to yielding and are unsure if the problem is the yield because it waits until the previous yield iter is done even tho it gets passed to Task.Run().

public async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    var index = 0;
    using var semaphore = new SemaphoreSlim(8, 16);
    foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
    {
        await semaphore.WaitAsync();
        try
        {
            tasks.Add(StartTasksAsync(items, index, cancellationToken).ContinueWith(task =>
            {
                semaphore.Release();
                tasks.Remove(task);
            }));
            index++;
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, "Error in HarvestEngine RunAsync");
            semaphore.Release();
        }
    }

    await Task.WhenAll(tasks);
}

private async Task StartTasksAsync(
    IEnumerable<string> items, 
    int index,
    CancellationToken cancellationToken)
{
    _logger.LogInformation($"[Task {index}] has started.....");

    var databaseService = _serviceProvider.GetRequiredService<DatabaseService>();
    var collectedSensitiveFiles = new List<FileInformation>();

    foreach (var item in items)
    {
        _logger.LogInformation($"{index} has found {item}");
    }
    await Task.Delay(1000);

    _logger.LogInformation($"[Task {index}] is done.....");
}

If I set Task.Delay to 10 sec, it will wait the 10 sec before the next task starts even tho the Task.Delay is inside my newly started task and should only make my new task sleep, and not do so it has to wait the desired amount before the next task can start up. so if i put 1 min, it will take 1 min before next starts up etc, which dosn't really make sense to me(Except if there is some yielding stuff I don't know).

This is how I yield my files found:

internal static IEnumerable<IEnumerable<string>> EnumerateFilesRecursively(
    string directory, 
    int batchSize,
    CancellationToken cancellationToken)
{
    Stack<string> stack = new Stack<string>();
    List<string> currentFoundFiles = new List<string>();
    stack.Push(directory);

    while (stack.Count > 0)
    {
        cancellationToken.ThrowIfCancellationRequested();
        string currentDir = stack.Pop();

        try
        {
            foreach (string subDir in Directory.GetDirectories(currentDir))
                stack.Push(subDir);
        }
        catch (UnauthorizedAccessException)
        {
            Log.Error("File Searcher: UnauthorizedAccessException");
        }
        catch (DirectoryNotFoundException)
        {
            Log.Error("File Searcher: UnauthorizedAccessException");

        }

        try
        {
            var files = Directory.GetFiles(currentDir, "*")
                .Where(file => allowedExtensions.Contains(Path.GetExtension(file)))
                .ToList();

            currentFoundFiles.AddRange(files);
        }
        catch (UnauthorizedAccessException)
        {
            Log.Error("File Searcher 2: UnauthorizedAccessException");
            continue;
        }
        catch (DirectoryNotFoundException)
        {
            Log.Error("File Searcher 2: UnauthorizedAccessException");
            continue;
        }

        if (currentFoundFiles.Count < batchSize) continue;
        yield return currentFoundFiles;
        currentFoundFiles.Clear();
    }

    if(!currentFoundFiles.IsNullOrEmpty())
        yield return currentFoundFiles;
}

** UPDATED **

So i have simplified it. What puzzles is the foreach in my Task.Run only prints all items out on my last created tasks, and all previous tasks it only prints the first item in the IEnumerable collection

 public async Task RunAsync(CancellationToken cancellationToken)
 {
     var tasks = new List<Task>();
     var index = 0;
     using var semaphore = new SemaphoreSlim(16, 16);
     foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
     {
         await semaphore.WaitAsync(cancellationToken);
         try
         {
             var localIndex = index;
             tasks.Add(Task.Run(async () =>
             {
                 _logger.LogInformation($"[Task {localIndex}] has started.....");
           
                 

                 _logger.LogInformation($"[Task {localIndex}] is done.....");
                 semaphore.Release();
             }, cancellationToken));
             index++;
         }
         catch(Exception ex)
         {
             semaphore.Release();
             _logger.LogError(ex, "Error in HarvestEngine RunAsync");
         }
     }

     await Task.WhenAll(tasks);
 }

Solution

  • StartTasksAsync is synchronous until the first await that isn't itself synchronous; which in your case, isn't until after the foreach.

    If your intent is for these to actually run in the background, you could:

    1. add an await Task.Yield(); at the start of the method, to force it to become truly async earlier
    2. use Task.Run when invoking the method, taking care around any captured variables (so that you pass in the intended values, not the values at some future time when the callback gets invoked)

    The second is more explicit re background, but either should work


    Additionally:

    • the semaphore only seems to protect tasks, but that method is the only thing accessing tasks, so no protection should be necessary - IMO remove the semaphore completely
    • you are removing things from tasks before they're completed; IMO remove the Remove

    If the intention of the semaphore is that you want a fixed number of things running concurrently: maybe use Parallel.ForEach instead, using ParallelOptions.MaxDegreeOfParallelism