I am using parallel foreach to add values in a blocking collection but when the blocking collection has 10 values i need to do some processing with that and then clear that blocking collection again and then again start adding value to the blocking collection.
There are two problems here
while i am doing some processing with that it will keep adding values to the blocking collection , i can put a lock on the list but still by the time it reaches the lock the value would have got increased.
If i am putting the lock that totally spoils the use of parallel programming, i want the addition to that object to happen in that list till the time those 10 messages are processed. I can copy the lists contents and empty the list again here also the same issue i cannot copy only 10 items because the contents have already changed.
sometimes the if condition is never satisfied because before the conditon is checked the values is increased.
Is there any solution to this ?
public static BlockingCollection<string> failedMessages = new BlockingCollection<string>();
static void Main(string[] args)
{
var myCollection = new List<string>();
myCollection.Add("test");
//Consider myCollection having more than 100 items
Parallel.ForEach(myCollection, item =>
{
failedMessages.Add(item);
if (failedMessages.Count == 10)
{
DoSomething();
}
});
}
static public void DoSomething()
{
//dosome operation with failedMessages
failedMessages = new BlockingCollection<string>();
}
This looks like a job for DataFlow:
Example using a BatchBlock<string>
with a batchsize of 10 and an ActionBlock<string[]>
to consume Batches:
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class Program
{
public static void Main()
{
Console.WriteLine("Hello World");
// Set up DataFlow Blocks
BatchBlock<string> batcher = new BatchBlock<string>( 10 );
ActionBlock<string[]> consumer =
new ActionBlock<string[]>(
(msgs) => Console.WriteLine("Processed {0} messages.", msgs.Length)
);
// put them together
batcher.LinkTo( consumer );
// start posting
Parallel.For( 0, 103, (i) => batcher.Post(string.Format("Test {0}",i)));
// shutdown
batcher.Complete();
batcher.Completion.Wait();
}
}
In action: https://dotnetfiddle.net/Y9Ezg4
Further Reading: https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-using-batchblock-and-batchedjoinblock-to-improve-efficiency
EDIT: As requested - if you cannot or do not want to use DataFlow, you can of course do something similar:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
public class Program
{
public static void Main()
{
FailedMessageHandler fmh = new FailedMessageHandler( new Progress<string[]>((list) => { Console.WriteLine("Handling {0} messages. [{1}]", list.Length, string.Join(",", list));}));
Parallel.For(0,52, (i) => {fmh.Add(string.Format("Test {0,3}",i));});
Thread.Sleep(1500); // Demo: Timeout
var result = Parallel.For(53,107, (i) => {fmh.Add(string.Format("Test {0,3}",i));});
while(!result.IsCompleted)
{
// Let Parallel.For run to end ...
Thread.Sleep(10);
}
// Graceful shutdown:
fmh.CompleteAdding();
fmh.AwaitCompletion();
}
}
public class FailedMessageHandler
{
private BlockingCollection<string> workQueue = new BlockingCollection<string>();
private List<string> currentBuffer = new List<string>(10);
private IProgress<string[]> progress;
private Thread workThread;
public FailedMessageHandler( IProgress<string[]> progress )
{
this.progress = progress;
workThread = new Thread(WatchDog);
workThread.Start();
}
public void Add( string failedMessage )
{
if ( workQueue.IsAddingCompleted )
{
throw new InvalidOperationException("Adding is completed!");
}
workQueue.Add(failedMessage);
}
private void WatchDog()
{
while(true)
{
// Demo: Include a timeout - If there are less than 10 items
// for x amount of time, send whatever you got so far.
CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(1));
try{
var failedMsg = workQueue.Take(timeout.Token);
currentBuffer.Add(failedMsg);
if( currentBuffer.Count >= 10 ){
progress.Report(currentBuffer.ToArray());
currentBuffer.Clear();
}
}
catch(OperationCanceledException)
{
Console.WriteLine("TIMEOUT!");
// timeout.
if( currentBuffer.Any() ) // handle items if there are
{
progress.Report(currentBuffer.ToArray());
currentBuffer.Clear();
}
}
catch(InvalidOperationException)
{
Console.WriteLine("COMPLETED!");
// queue has been completed.
if( currentBuffer.Any() ) // handle remaining items
{
progress.Report(currentBuffer.ToArray());
currentBuffer.Clear();
}
break;
}
}
Console.WriteLine("DONE!");
}
public void CompleteAdding()
{
workQueue.CompleteAdding();
}
public void AwaitCompletion()
{
if( workThread != null )
workThread.Join();
}
}
In Action: https://dotnetfiddle.net/H2Rg35
Mind, that the use of Progress
will execute the handling on the main thread. If you pass an Action
instead, it would execute on the workThread
. So, tweak the example to your requirements.
This is also just to give an idea, there are a lot of variations of this, maybe using Task/Async ...