Search code examples
c#multithreadingtaskproducer-consumer

Second Task does not end although loop parameter condition is updated in separate task


Method readAllFilesAtRootFolder:

public static Dictionary<string, List<mediaFile>> readAllFilesAtRootFolder(
    string rootFolder, string[] extensions, bool subFolders,
    ConcurrentQueue<mediaFile> fileQueue, CancellationToken cancellationToken)
{
    var allFiles = new Dictionary<string, List<mediaFile>>();
    IEnumerable<string> files;
    try
    {
        if (subFolders)
        {
            files = extensions.SelectMany(ext => System.IO.Directory.GetFiles(
                rootFolder, ext, SearchOption.AllDirectories));
        }
        else
        {
            files = extensions.SelectMany(ext => System.IO.Directory.GetFiles(
                rootFolder, ext, SearchOption.TopDirectoryOnly));
        }
        int i = 1;
        foreach (var file in files)
        {
            if (cancellationToken.IsCancellationRequested)
                break;

            mediaFile mediaFile = new mediaFile();
            string extension = Path.GetExtension(file).Replace(".", "").ToUpper();
            // If the extension is not empty, group the files by extension
            if (!string.IsNullOrEmpty(extension))
            {
                if (!allFiles.ContainsKey(extension))
                {
                    allFiles[extension] = new List<mediaFile>();
                }
                mediaFile.fileName = file;
                //mediaFile.exif_date = getImageExifDate(file.ToString());
                fileQueue.Enqueue(mediaFile);
                allFiles[extension].Add(mediaFile);
            }
        }
        return allFiles;
    }
    catch { }
    return allFiles;
}

Method readAllEXIFdates:

public static void readAllEXIFdates(ConcurrentQueue<mediaFile> fileQueue,
    ConcurrentDictionary<string, List<mediaFile>> resultDictionary,
    CancellationToken cancellationToken, bool producerCompleted)
{
    var allEXIFFiles = new Dictionary<string, List<mediaFile>>();

    {
        //Provide some free time for the main thread
        //Thread.Sleep(100);
        while (!cancellationToken.IsCancellationRequested
            && (!producerCompleted || !fileQueue.IsEmpty))
        {
            if (fileQueue.TryDequeue(out var mediaFile))
            {
                // Process EXIF date
                mediaFile.exif_date = getImageExifDate(mediaFile.fileName);
                var extension = Path.GetExtension(mediaFile.fileName)
                    .ToUpperInvariant().TrimStart('.');

                if (!string.IsNullOrEmpty(extension))
                {
                    resultDictionary.AddOrUpdate(extension,
                        new List<mediaFile> { mediaFile },
                        (key, existingList) =>
                        {
                            existingList.Add(mediaFile);
                            return existingList;
                        });
                }
            }
            else
            {
                // Thread.Sleep(100);
                // Wait briefly if no files are available in the queue
            }
        }
    }
}

Button click event handler:

private async void button4_Click_1(object sender, EventArgs e)
{
    string[] extenions = getSelectExtensions(chkExtensions);

    label5.Text = "Raading files, please wait....";

    _cancellationTokenSource = new CancellationTokenSource();

    string[] extensions = getSelectExtensions(chkExtensions);

    label5.Text = "Reading files, please wait...";

    // Define shared resources
    var fileQueue = new ConcurrentQueue<mediaFile>();
    var resultDictionary = new ConcurrentDictionary<string, List<mediaFile>>();

    bool producerCompleted = false;
    // Producer Task
    var producerTask = Task.Run(() =>
    {
        try
        {
            readAllFilesAtRootFolder(
                "H:/My Photos/IMages/2015 - Dhaham's Choir Concert",
                extensions, chkSubFolders.Checked, fileQueue,
                _cancellationTokenSource.Token);
        }
        finally
        {
            producerCompleted = true;
        }
    });

    var consumerTask = Task.Run(() => readAllEXIFdates(fileQueue, resultDictionary,
        _cancellationTokenSource.Token, producerCompleted));
    // Wait for both tasks to complete
    await Task.WhenAll(producerTask, consumerTask);

    label8.Text = $"Total files processed: {resultDictionary.Values.Count}";
}

The readAllEXIFdates() should exit when producerCompleted is changed to true but the readAllEXIFdates() loop never sees the update, even producerCompleted is updated as a public parameter to the class. Why is that ?

while (!cancellationToken.IsCancellationRequested
    && (!producerCompleted || !fileQueue.IsEmpty))

This is the while loop that does not exit since producerCompleted never get updated.


Solution

  • You have done an enormous amount of work to basically simulate an existing framework type - the BlockingCollection. You should use this instead of a ConcurrentQueue.

    Change your producer to Add to it, and then call CompleteAdding when done.

    Change your consumer to foreach over GetConsumingEnumerable.

    Done. That is all you need to do. GetConsumingEnumerable will automatically serve up the items added to it, and the foreach will exit once all queue items are processed. There is no need for producerCompleted etc.