I am trying to implement producer/consumer pattern with multiple or parallel consumers.
I did an implementation but I would like to know how good it is. Can somebody do better? Can any of you spot any errors?
Unfortunately I can not use TPL dataflow, because we are at the end of our project and to put in an extra library in our package would take to much paperwork and we do not have that time.
What I am trying to do is to speed up the following portion:
anIntermediaryList = StepOne(anInputList); // I will put StepOne as Producer :-) Step one is remote call.
aResultList = StepTwo(anIntermediaryList); // I will put StepTwo as Consumer, however he also produces result. Step two is also a remote call.
// StepOne is way faster than StepTwo.
For this I came up with the idea that I will chunk the input list (anInputList)
StepOne will be inside of a Producer and will put the intermediary chunks into a queue. There will be multiple Producers and they will take the intermediary results and process it with StepTwo.
Here is a simplified version of of the implementation later:
Task.Run(() => {
aChunkinputList = Split(anInputList)
foreach(aChunk in aChunkinputList)
{
anIntermediaryResult = StepOne(aChunk)
intermediaryQueue.Add(anIntermediaryResult)
}
})
while(intermediaryQueue.HasItems)
{
anItermediaryResult = intermediaryQueue.Dequeue()
Task.Run(() => {
aResultList = StepTwo(anItermediaryResult);
resultQueue.Add(aResultList)
}
}
I also thought that the best number for the parallel running Consumers would be: "Environment.ProcessorCount / 2". I would like to know if this also is a good idea.
Now here is my mock implementation and the question is can somebody do better or spot any error?
class Example
{
protected static readonly int ParameterCount_ = 1000;
protected static readonly int ChunkSize_ = 100;
// This might be a good number for the parallel consumers.
protected static readonly int ConsumerCount_ = Environment.ProcessorCount / 2;
protected Semaphore mySemaphore_ = new Semaphore(Example.ConsumerCount_, Example.ConsumerCount_);
protected ConcurrentQueue<List<int>> myIntermediaryQueue_ = new ConcurrentQueue<List<int>>();
protected ConcurrentQueue<List<int>> myResultQueue_ = new ConcurrentQueue<List<int>>();
public void Main()
{
List<int> aListToProcess = new List<int>(Example.ParameterCount_ + 1);
aListToProcess.AddRange(Enumerable.Range(0, Example.ParameterCount_));
Task aProducerTask = Task.Run(() => Producer(aListToProcess));
List<Task> aTaskList = new List<Task>();
while(!aProducerTask.IsCompleted || myIntermediaryQueue_.Count > 0)
{
List<int> aChunkToProcess;
if (myIntermediaryQueue_.TryDequeue(out aChunkToProcess))
{
mySemaphore_.WaitOne();
aTaskList.Add(Task.Run(() => Consumer(aChunkToProcess)));
}
}
Task.WaitAll(aTaskList.ToArray());
List<int> aResultList = new List<int>();
foreach(List<int> aChunk in myResultQueue_)
{
aResultList.AddRange(aChunk);
}
aResultList.Sort();
if (aListToProcess.SequenceEqual(aResultList))
{
Console.WriteLine("All good!");
}
else
{
Console.WriteLine("Bad, very bad!");
}
}
protected void Producer(List<int> elements_in)
{
List<List<int>> aChunkList = Example.SplitList(elements_in, Example.ChunkSize_);
foreach(List<int> aChunk in aChunkList)
{
Console.WriteLine("Thread Id: {0} Producing from: ({1}-{2})",
Thread.CurrentThread.ManagedThreadId,
aChunk.First(),
aChunk.Last());
myIntermediaryQueue_.Enqueue(ProduceItemsRemoteCall(aChunk));
}
}
protected void Consumer(List<int> elements_in)
{
Console.WriteLine("Thread Id: {0} Consuming from: ({1}-{2})",
Thread.CurrentThread.ManagedThreadId,
Convert.ToInt32(Math.Sqrt(elements_in.First())),
Convert.ToInt32(Math.Sqrt(elements_in.Last())));
myResultQueue_.Enqueue(ConsumeItemsRemoteCall(elements_in));
mySemaphore_.Release();
}
// Dummy Remote Call
protected List<int> ProduceItemsRemoteCall(List<int> elements_in)
{
return elements_in.Select(x => x * x).ToList();
}
// Dummy Remote Call
protected List<int> ConsumeItemsRemoteCall(List<int> elements_in)
{
return elements_in.Select(x => Convert.ToInt32(Math.Sqrt(x))).ToList();
}
public static List<List<int>> SplitList(List<int> masterList_in, int chunkSize_in)
{
List<List<int>> aReturnList = new List<List<int>>();
for (int i = 0; i < masterList_in.Count; i += chunkSize_in)
{
aReturnList.Add(masterList_in.GetRange(i, Math.Min(chunkSize_in, masterList_in.Count - i)));
}
return aReturnList;
}
}
Main function:
class Program
{
static void Main(string[] args)
{
Example anExample = new Example();
anExample.Main();
}
}
Bye Laszlo
Based on the comments I've posted a second and third version: https://codereview.stackexchange.com/questions/71182/producer-consumer-in-c-with-multiple-parallel-consumers-and-no-tpl-dataflow/71233#71233