Search code examples
c#multithreadingmanualresetevent

c# multiple threads waiting for a ManualResetEvent


I'm messing around with multithreading and making some sort of task engine. The idea is that the engine can have a configurable amount of threads waiting and when a new task arrives the first free thread picks it up and executes it.

The problem is that something 2 threads pickup the same task somehow. I looked it through and I think that this code should work but obviously it doesn't. If I add the 10ms sleep where it is now commented out it works, but I'm not sure I understand why. It looks like the .Reset() function returns before it actually resets the event?

Can somebody explain? Is there a better way to let only a single thread continue when there are multiple waiting?

Thanks

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TaskTest
{
    public class Engine
    {
        private ManualResetEvent taskEvent;
        private ConcurrentQueue<Task> tasks;
        private bool running;
        private List<Thread> threads;
        private int threadAmount;
        private int threadsBusy = 0;

        public Engine(int amountOfThreads)
        {
            taskEvent = new ManualResetEvent(false);
            tasks = new ConcurrentQueue<Task>();
            threads = new List<Thread>();

            threadAmount = amountOfThreads;
        }

        public void Start()
        {
            running = true;
            for (var i = 0; i < threadAmount; i++)
            {
                var thread = new Thread(Process);
                thread.Name = "Thread " + i;
                threads.Add(thread);
                thread.Start();
            }
        }

        public void Stop()
        {
            running = false;
            taskEvent.Set();
            threads.ForEach(t => t.Join());
        }

        private void Process()
        {
            while (running)
            {
                lock (taskEvent)
                {
                    // Lock it so only a single thread is waiting on the event at the same time
                    taskEvent.WaitOne();
                    taskEvent.Reset();
                    //Thread.Sleep(10);
                }

                if (!running)
                {
                    taskEvent.Set();
                    return;
                }

                threadsBusy += 1;
                if (threadsBusy > 1)
                    Console.WriteLine("Failed");

                Task task;
                if (tasks.TryDequeue(out task))
                    task.Execute();

                threadsBusy -= 1;
            }
        }

        public void Enqueue(Task t)
        {
            tasks.Enqueue(t);
            taskEvent.Set();
        }
    }
}

EDIT Rest of the code:

namespace TaskTest
{
    public class Start
    {
        public static void Main(params string[] args)
        {
            var engine = new Engine(4);
            engine.Start();

            while (true)
            {
                Console.Read();
                engine.Enqueue(new Task());
            }
        }
    }
}


namespace TaskTest
{
    public class Task
    {
        public void Execute()
        {
            Console.WriteLine(Thread.CurrentThread.Name);
        }
    }
}

Solution

  • When using Console.Read() on a key press, two characters are read from the input. You should use Console.ReadLine() instead.

    Note that your code can be simplified a lot by using a BlockingCollection to handle the synchronization:

    public class Engine
    {
        private BlockingCollection<Task> tasks;
        private List<Thread> threads;
        private int threadAmount;
    
        public Engine(int amountOfThreads)
        {
            tasks = new BlockingCollection<Task>();
            threads = new List<Thread>();
    
            threadAmount = amountOfThreads;
        }
    
        public void Start()
        {
            for (var i = 0; i < threadAmount; i++)
            {
                var thread = new Thread(Process);
                thread.Name = "Thread " + i;
                threads.Add(thread);
                thread.Start();
            }
        }
    
        public void Stop()
        {
            tasks.CompleteAdding();
            threads.ForEach(t => t.Join());
        }
    
        private void Process()
        {
            foreach (var task in tasks.GetConsumingEnumerable())
            {
                task.Execute();
            }
        }
    
        public void Enqueue(Task t)
        {
            tasks.Add(t);
        }
    }