I would like to be able to read from a queue and print element that was in the queue in the same order that they pushed, for example:
(order is from left to right) Queue -> [{1, "Single"},{2,"Single"}, {1,"Married"},{1,"Divorced"},{2,"Married"},{2,"Divorced"},{1,"Widow"},{2,"Widow"}]
My goal is to synchronized thread so in any case there are 2 threads running at PrintInOrder with argument {1, "Single"} at one thread and {1,"Married"} at second thread, it will print the {1,"Single"} before {1,"Married"} because it first in the queue.
I try to solve it by the following code:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApp
{
public class PrintUsersQueueInOrderMutlithreaded
{
public static void Main()
{
var printUsersQueueInOrderMutlithreaded =
new PrintUsersQueueInOrderMutlithreaded();
printUsersQueueInOrderMutlithreaded.PopulateQueue();
printUsersQueueInOrderMutlithreaded.ReadFromQueue();
Console.ReadLine();
}
private Dictionary<int, SemaphoreSlim> userLocks;
private Queue<User> usersQueue;
private string[] maritalStatus = { "Married", "Divorced", "Widow" };
public PrintUsersQueueInOrderMutlithreaded()
{
usersQueue = new Queue<User>();
userLocks = new Dictionary<int, SemaphoreSlim>();
}
internal void PopulateQueue()
{
usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Single" });
usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Single" });
usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Married" });
usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Divorced" });
usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Married" });
usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Divorced" });
usersQueue.Enqueue(new User() { Id = 1, MaritalStatus = "Widow" });
usersQueue.Enqueue(new User() { Id = 2, MaritalStatus = "Widow" });
}
internal void ReadFromQueue()
{
List<Task> tasks = new List<Task>();
while (usersQueue.Count > 0)
{
var user = usersQueue.Dequeue();
tasks.Add(Task.Run(() => PrintInOrder(user)));
}
Task.WaitAll(tasks.ToArray());
}
private void PrintInOrder(User user)
{
SemaphoreSlim userLock;
lock (userLocks)
{
if (!userLocks.ContainsKey(user.Id))
{
userLocks[user.Id] = new SemaphoreSlim(1, 1);
}
userLock = userLocks[user.Id];
}
userLock.Wait();
try
{
Console.WriteLine(user);
}
finally
{
userLock.Release();
}
}
}
internal class User
{
public int Id { get; set; }
public string MaritalStatus { get; set; }
public override string ToString()
{
return $"Thread id {Thread.CurrentThread.ManagedThreadId,2}" +
$" User id {Id,2} Marital status {MaritalStatus,2}";
}
}
}
It make we wonder if it possible to control the order of PrintInOrder
execution :\
Thanks!
The moment you do Task.Run
, you're making the order indeterminate by creating multiple parallel flows - you cannot recover from that; if you want to preserve order, you need to retain order. Actually, though, your code is already broken re thread-safety: you have concurrent access to usersQueue
which is not a thread-safe object. If this was me, I would switch to something like a Channel<User>
, and have one reader that simply dequeues from that (asynchronously, or even via await foreach
on the .AsAsyncEnumerable()
from it), and have that single reader to all the writes.