Search code examples
c#multithreadingconcurrencyqueuelocking

Print queue content in order mutlithreaded


I would like to be able to read from a queue and print element that was in the queue in the same order that they pushed, for example:

(order is from left to right) Queue -> [{1, "Single"},{2,"Single"}, {1,"Married"},{1,"Divorced"},{2,"Married"},{2,"Divorced"},{1,"Widow"},{2,"Widow"}]

My goal is to synchronized thread so in any case there are 2 threads running at PrintInOrder with argument {1, "Single"} at one thread and {1,"Married"} at second thread, it will print the {1,"Single"} before {1,"Married"} because it first in the queue.

I try to solve it by the following code:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp
{
    public class PrintUsersQueueInOrderMutlithreaded
    {
        public static void Main()
        {
            var printUsersQueueInOrderMutlithreaded =
                new PrintUsersQueueInOrderMutlithreaded();
            printUsersQueueInOrderMutlithreaded.PopulateQueue();
            printUsersQueueInOrderMutlithreaded.ReadFromQueue();

            Console.ReadLine();
        }

        private Dictionary<int, SemaphoreSlim> userLocks;
        private Queue<User> usersQueue;

        private string[] maritalStatus = { "Married", "Divorced", "Widow" };

        public PrintUsersQueueInOrderMutlithreaded()
        {
            usersQueue = new Queue<User>();
            userLocks = new Dictionary<int, SemaphoreSlim>();
        }

        internal void PopulateQueue()
        {
            usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Single" });
            usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Single" });
            usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Married" });
            usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Divorced" });
            usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Married" });
            usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Divorced" });
            usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Widow" });
            usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Widow" });
        }

        internal void ReadFromQueue()
        {
            List<Task> tasks = new List<Task>();
            while (usersQueue.Count > 0)
            {
                var user = usersQueue.Dequeue();
                tasks.Add(Task.Run(() => PrintInOrder(user)));
            }

            Task.WaitAll(tasks.ToArray());
        }

        private void PrintInOrder(User user)
        {
            SemaphoreSlim userLock;
            lock (userLocks)
            {
                if (!userLocks.ContainsKey(user.Id))
                {
                    userLocks[user.Id] = new SemaphoreSlim(1, 1);
                }
                userLock = userLocks[user.Id];
            }

            userLock.Wait();
            try
            {
                Console.WriteLine(user);
            }
            finally
            {
                userLock.Release();
            }
        }
    }

    internal class User
    {
        public int Id { get; set; }
        public string MaritalStatus { get; set; }

        public override string ToString()
        {
            return $"Thread id {Thread.CurrentThread.ManagedThreadId,2}" +
                $" User id {Id,2} Marital status {MaritalStatus,2}";
        }
    }
}

It make we wonder if it possible to control the order of PrintInOrder execution :\

Thanks!


Solution

  • The moment you do Task.Run, you're making the order indeterminate by creating multiple parallel flows - you cannot recover from that; if you want to preserve order, you need to retain order. Actually, though, your code is already broken re thread-safety: you have concurrent access to usersQueue which is not a thread-safe object. If this was me, I would switch to something like a Channel<User>, and have one reader that simply dequeues from that (asynchronously, or even via await foreach on the .AsAsyncEnumerable() from it), and have that single reader to all the writes.