I have a simple scenario with two threads where the first thread reads permanently some data and enqueues that data into a queue. The second thread first peeks at a single object from that queue and makes some conditional checks. If these are good the single object will be dequeued and passed to some processing.
I have tried to use the ConcurrentQueue
which is a thread safe implementation of a simple queue, but the problem with this one is that all calls are blocking. This means if the first thread is enqueuing an object, the second thread can't peek or dequeue an object.
In my situation I need to enqueue at the end and dequeue from the beginning of the queue at the same time.
The lock statement of C# would also.
So my question is whether it is possible to do these both operations in parallel without blocking each other in a thread safe way.
These are my first tries and this is an similar example for my problem.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Scenario {
public class Program {
public static void Main(string[] args) {
Scenario scenario = new Scenario();
scenario.Start();
Console.ReadKey();
}
public class Scenario {
public Scenario() {
someData = new Queue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
Console.WriteLine("Enqueued " + newData);
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (someData.Count == 0) {
continue;
}
int singleData = someData.Peek();
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
singleData = someData.Dequeue();
Console.WriteLine("Dequeued " + singleData);
// ... processing ...
}
}
}
private readonly Queue<int> someData;
}
}
}
Second example:
public class Scenario {
public Scenario() {
someData = new ConcurrentQueue<int>();
}
public void Start() {
Task.Factory.StartNew(firstThread);
Task.Factory.StartNew(secondThread);
}
private void firstThread() {
Random random = new Random();
while (true) {
int newData = random.Next(1, 100);
someData.Enqueue(newData);
lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
}
}
private void secondThread() {
Random random = new Random();
while (true) {
if (!someData.TryPeek(out int singleData)) {
continue;
}
int someValue = random.Next(1, 100);
if (singleData > someValue || singleData == 1 || singleData == 99) {
if (!someData.TryDequeue(out singleData)) {
continue;
}
lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }
// ... processing ...
}
}
}
private int enqued = 0;
private int dequed = 0;
private readonly ConcurrentQueue<int> someData;
private static readonly object syncRoot = new object();
}
First off: I strongly encourage you to reconsider whether your technique of having multiple threads and a shared memory data structure is even the right approach at all. Code that has multiple threads of control sharing access to data structures is hard to get right, and failures can be subtle, catastrophic, and hard to debug.
Second: If you are bent upon multiple threads and a shared memory data structure, I strongly encourage you to use designed-by-experts data types like concurrent queues, rather than rolling your own.
Now that I've got those warnings out of the way: here is a way to address your concern. It is sufficiently complicated that you should obtain the services of an expert on the C# memory model to verify the correctness of your solution if you go with this. I would not consider myself to be competent to implement the scheme I'm about to describe, not without help of someone who is actually an expert on the memory model.
The goal is to have a queue that supports simultaneous enqueue and dequeue operations and low lock contention.
What you want is two immutable stack variables called the enqueue stack and the dequeue stack, each with their own lock.
The enqueue operation is:
The dequeue operation is:
Note that of course if there is only one thread dequeuing, then we don't need the dequeue lock at all, but with this scheme there can be many threads dequeuing.
Suppose there are 1000 items on the enqueue stack and zero on the dequeue stack. When we dequeue the first time, we do an expensive O(n) operation of reversing the enqueue stack once, but now we have 1000 items on the dequeue stack. Once the dequeue stack is big, the dequeueing thread can spend most of its time processing, while the enqueuing thread spends most of its time enqueuing. Contention on the enqueue lock is rare, but expensive when it happens.
Why use immutable data structures? Everything I described here would also work with mutable stacks, but (1) it is easier to reason about immutable stacks, (2) if you want to really live dangerously you can elide some of the locks and go for interlocked swap operations; make sure you understand everything about the possible re-orderings of operations in low-lock conditions if you're doing that.
UPDATE:
The real problem is that i cant dequeue and process a lot of points because i am permanently reading and enquing new points. That enqueue calls are blocking the processing step.
Well if that is your real problem then mentioning it in the question instead of burying it in a comment would be a good idea. Help us help you.
There are a number of things you could do here. You could for example set the priority of the enqueuing thread lower than the priority of the dequeuing thread. Or you could have multiple dequeuing threads, as many as there are CPUs in your machine. Or you could dynamically choose to drop some enqueue operations if the dequeues are not keeping up. Without knowing a lot more about your actual problem it is hard to give advice on how to solve it.