I'm currently trying to implement a file searcher that yields found files in batches so it can get processed concurrently. But my problem arrives when it yields and I pass it to a task. It never starts multiple tasks up, but waits for the previous task is done before starting next up, where I want it to start as many tasks up as my SemaphoreSlim
is allowing. I'm new to yielding and are unsure if the problem is the yield because it waits until the previous yield iter is done even tho it gets passed to Task.Run()
.
public async Task RunAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var index = 0;
using var semaphore = new SemaphoreSlim(8, 16);
foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
{
await semaphore.WaitAsync();
try
{
tasks.Add(StartTasksAsync(items, index, cancellationToken).ContinueWith(task =>
{
semaphore.Release();
tasks.Remove(task);
}));
index++;
}
catch(Exception ex)
{
_logger.LogError(ex, "Error in HarvestEngine RunAsync");
semaphore.Release();
}
}
await Task.WhenAll(tasks);
}
private async Task StartTasksAsync(
IEnumerable<string> items,
int index,
CancellationToken cancellationToken)
{
_logger.LogInformation($"[Task {index}] has started.....");
var databaseService = _serviceProvider.GetRequiredService<DatabaseService>();
var collectedSensitiveFiles = new List<FileInformation>();
foreach (var item in items)
{
_logger.LogInformation($"{index} has found {item}");
}
await Task.Delay(1000);
_logger.LogInformation($"[Task {index}] is done.....");
}
If I set Task.Delay
to 10 sec, it will wait the 10 sec before the next task starts even tho the Task.Delay
is inside my newly started task and should only make my new task sleep, and not do so it has to wait the desired amount before the next task can start up. so if i put 1 min, it will take 1 min before next starts up etc, which dosn't really make sense to me(Except if there is some yielding stuff I don't know).
This is how I yield my files found:
internal static IEnumerable<IEnumerable<string>> EnumerateFilesRecursively(
string directory,
int batchSize,
CancellationToken cancellationToken)
{
Stack<string> stack = new Stack<string>();
List<string> currentFoundFiles = new List<string>();
stack.Push(directory);
while (stack.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();
string currentDir = stack.Pop();
try
{
foreach (string subDir in Directory.GetDirectories(currentDir))
stack.Push(subDir);
}
catch (UnauthorizedAccessException)
{
Log.Error("File Searcher: UnauthorizedAccessException");
}
catch (DirectoryNotFoundException)
{
Log.Error("File Searcher: UnauthorizedAccessException");
}
try
{
var files = Directory.GetFiles(currentDir, "*")
.Where(file => allowedExtensions.Contains(Path.GetExtension(file)))
.ToList();
currentFoundFiles.AddRange(files);
}
catch (UnauthorizedAccessException)
{
Log.Error("File Searcher 2: UnauthorizedAccessException");
continue;
}
catch (DirectoryNotFoundException)
{
Log.Error("File Searcher 2: UnauthorizedAccessException");
continue;
}
if (currentFoundFiles.Count < batchSize) continue;
yield return currentFoundFiles;
currentFoundFiles.Clear();
}
if(!currentFoundFiles.IsNullOrEmpty())
yield return currentFoundFiles;
}
** UPDATED **
So i have simplified it.
What puzzles is the foreach in my Task.Run
only prints all items out on my last created tasks, and all previous tasks it only prints the first item in the IEnumerable
collection
public async Task RunAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var index = 0;
using var semaphore = new SemaphoreSlim(16, 16);
foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
{
await semaphore.WaitAsync(cancellationToken);
try
{
var localIndex = index;
tasks.Add(Task.Run(async () =>
{
_logger.LogInformation($"[Task {localIndex}] has started.....");
_logger.LogInformation($"[Task {localIndex}] is done.....");
semaphore.Release();
}, cancellationToken));
index++;
}
catch(Exception ex)
{
semaphore.Release();
_logger.LogError(ex, "Error in HarvestEngine RunAsync");
}
}
await Task.WhenAll(tasks);
}
StartTasksAsync
is synchronous until the first await
that isn't itself synchronous; which in your case, isn't until after the foreach
.
If your intent is for these to actually run in the background, you could:
await Task.Yield();
at the start of the method, to force it to become truly async earlierTask.Run
when invoking the method, taking care around any captured variables (so that you pass in the intended values, not the values at some future time when the callback gets invoked)The second is more explicit re background, but either should work
Additionally:
tasks
, but that method is the only thing accessing tasks
, so no protection should be necessary - IMO remove the semaphore completelytasks
before they're completed; IMO remove the Remove
If the intention of the semaphore is that you want a fixed number of things running concurrently: maybe use Parallel.ForEach
instead, using ParallelOptions.MaxDegreeOfParallelism