I need to implement a queue of requests which can be populated from multiple threads. When this queue becomes larger than 1000 completed requests, this requests should be stored into database. Here is my implementation:
public class RequestQueue
{
private static BlockingCollection<VerificationRequest> _queue = new BlockingCollection<VerificationRequest>();
private static ConcurrentQueue<VerificationRequest> _storageQueue = new ConcurrentQueue<VerificationRequest>();
private static volatile bool isLoading = false;
private static object _lock = new object();
public static void Launch()
{
Task.Factory.StartNew(execute);
}
public static void Add(VerificationRequest request)
{
_queue.Add(request);
}
public static void AddRange(List<VerificationRequest> requests)
{
Parallel.ForEach(requests, new ParallelOptions() {MaxDegreeOfParallelism = 3},
(request) => { _queue.Add(request); });
}
private static void execute()
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), new ParallelOptions {MaxDegreeOfParallelism = 5}, EnqueueSaveRequest );
}
private static void EnqueueSaveRequest(VerificationRequest request)
{
_storageQueue.Enqueue( new RequestExecuter().ExecuteVerificationRequest( request ) );
if (_storageQueue.Count > 1000 && !isLoading)
{
lock ( _lock )
{
if ( _storageQueue.Count > 1000 && !isLoading )
{
isLoading = true;
var requestChunck = new List<VerificationRequest>();
VerificationRequest req;
for (var i = 0; i < 1000; i++)
{
if( _storageQueue.TryDequeue(out req))
requestChunck.Add(req);
}
new VerificationRequestRepository().InsertRange(requestChunck);
isLoading = false;
}
}
}
}
}
Is there any way to implement this without lock and isLoading?
The easiest way to do what you ask is to use the blocks in the TPL Dataflow library. Eg
var batchBlock = new BatchBlock<VerificationRequest>(1000);
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
new VerificationRequestRepository().InsertRange(records);
};
batchBlock.LinkTo(exportBlock , new DataflowLinkOptions { PropagateCompletion = true });
That's it.
You can send messages to the starting block with
batchBlock.Post(new VerificationRequest(...));
Once you finish your work, you can take down the entire pipeline and flush any leftover messages by calling batchBlock.Complete();
and await for the final block to finish:
batchBlock.Complete();
await exportBlock.Completion;
The BatchBlock batches up to 1000 records into arrays of 1000 items and passes them to the next block. An ActionBlock uses 1 task only by default, so it is thread-safe. You could use an existing instance of your repository without worrying about cross-thread access:
var repository=new VerificationRequestRepository();
var exportBlock = new ActionBlock<VerificationRequest[]>(records=>{
repository.InsertRange(records);
};
Almost all blocks have a concurrent input buffer. Each block runs on its own TPL task, so each step runs concurrently with each other. This means that you get asynchronous execution "for free" and can be important if you have multiple linked steps, eg you use a TransformBlock to modify the messages flowing through the pipeline.
I use such pipelines to create pipelines that call external services, parse responses, generate the final records, batch them and send them to the database with a block that uses SqlBulkCopy.