Search code examples
c#multithreadingasynchronousasync-awaitmanualresetevent

How to properly use Async, Await and ManualResetEvents to control an infinite while loop


So what i am trying to do here is:

  1. Make the engine loop and work on an object if the queue is not empty.
  2. If the queue is empty i call the manualresetevent to make the thread sleep.
  3. When a item is added and the loop is not active i set the manualresetevent.
  4. To make it faster i pick up atmost 5 items from the list and perform operation on them asynchronously and wait for all of them to finish.

Problem:

  1. The clear methods on the two lists are called as soon as a new call to the AddToUpdateQueueMethod is called.
  2. In my head as i am waiting for Task.WhenAll(tasks), so thread should wait for its completion before moving ahead, hence the clear on the lists should only be called on after Task.WhenAll(tasks) returns.

What am i missing here, or what will be a better way to achieve this.

    public async Task ThumbnailUpdaterEngine()
    {
        int count;
        List<Task<bool>> tasks = new List<Task<bool>>();
        List<Content> candidateContents = new List<Content>();
        while (true)
        {

            for (int i = 0; i < 5; i++)
            {
                Content nextContent = GetNextFromInternalQueue();
                if (nextContent == null)
                    break;
                else
                    candidateContents.Add(nextContent);

            }

            foreach (var candidateContent in candidateContents)
            {
                foreach (var provider in interactionProviders)
                {
                    if (provider.IsServiceSupported(candidateContent.ServiceType))
                    {
                        Task<bool> task = provider.UpdateThumbnail(candidateContent);
                        tasks.Add(task);
                        break;
                    }
                }
            }
            var results = await Task.WhenAll(tasks);
            tasks.Clear();
            foreach (var candidateContent in candidateContents)
            {
                if (candidateContent.ThumbnailLink != null && !candidateContent.ThumbnailLink.Equals(candidateContent.FileIconLink, StringComparison.CurrentCultureIgnoreCase))
                {
                    Task<bool> task = DownloadAndUpdateThumbnailCache(candidateContent);
                    tasks.Add(task);
                }
            }
            await Task.WhenAll(tasks);

            //Clean up for next time the loop comes in.
            tasks.Clear();
            candidateContents.Clear();

            lock (syncObject)
            {
                count = internalQueue.Count;
                if (count == 0)
                {
                    isQueueControllerRunning = false;
                    monitorEvent.Reset();
                }
            }
            await Task.Run(() => monitorEvent.WaitOne());


        }
    }

    private Content GetNextFromInternalQueue()
    {
        lock (syncObject)
        {
            Content nextContent = null;
            if (internalQueue.Count > 0)
            {
                nextContent = internalQueue[0];
                internalQueue.Remove(nextContent);
            }
            return nextContent;
        }
    }

    public void AddToUpdateQueue(Content content)
    {
        lock (syncObject)
        {
            internalQueue.Add(content);
            if (!isQueueControllerRunning)
            {
                isQueueControllerRunning = true;
                monitorEvent.Set();
            }
        }
    }

Solution

  • You should simply use TPL Dataflow. It's an actor framework on top of the TPL with an async support. Use an ActionBlock with an async action and MaxDegreeOfParallelism of 5:

    var block = new ActionBlock<Content>(
        async content => 
        {
            var tasks = interactionProviders.
                Where(provider => provider.IsServiceSupported(content.ServiceType)).
                Select(provider => provider.UpdateThumbnail(content));
            await Task.WhenAll(tasks);
    
            if (content.ThumbnailLink != null && !content.ThumbnailLink.Equals(
                content.FileIconLink, 
                StringComparison.CurrentCultureIgnoreCase))
            {
                await DownloadAndUpdateThumbnailCache(content);
            }
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5});
    

    foreach (var content in GetContent())
    {
        block.Post(content);
    }
    
    block.Complete();
    await block.Completion