What i have got at the moment is a timer that fires every 5000 ms:
static Timer _aTimer = new System.Timers.Timer();
static void Main(string[] args)
{
_aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
_aTimer.Interval = 5000;
_aTimer.Enabled = true;
Console.WriteLine("Press \'q\' to quit the sample.");
while (Console.Read() != 'q') ;
}
On fire it then sets up the queues for processing the files:
private static void OnTimedEvent(object source, ElapsedEventArgs e)
{
// stop the timer so we dont reprocess files we already have in the queue
StopTimer();
// setup a list of queues
var lists = new List<IncomingOrderQueue>();
//get the accounts in which the files we are looking in
var accounts = new List<string>() { "Account1", "Account2" };
//loop through the accounts and set up the queue
foreach (var acc in accounts)
{
// create the queue
var tmp = new IncomingOrderQueue();
// for each file in the folders add it to be processed in the queue
foreach (var orderFile in OrderFiles(acc))
{
tmp.EnqueueSweep(new QueueVariables() { Account = acc, File = orderFile });
}
// add the queue to the list of queues
lists.Add(tmp);
}
// for each of the queues consume all the contents of them
Parallel.ForEach(lists, l => l.Consume());
//start the timer back up again because we have finished all the files we have in the current queue
StartTimer();
}
public static void StopTimer()
{
Console.WriteLine("Stop Timer");
_aTimer.Stop();
_aTimer.Enabled = false;
}
public static void StartTimer()
{
Console.WriteLine("Start Timer");
_aTimer.Enabled = true;
_aTimer.Start();
}
The Blocking Queue its self:
public class IncomingOrderQueue
{
BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>();
public void EnqueueSweep(QueueVariables incoming)
{
// add items to the queue
_orderQ.Add(incoming);
}
public void Consume()
{
// stop anything been adding to the queue
_orderQ.CompleteAdding();
// consume all the objects in the blocking collection
Parallel.ForEach(_orderQ.GetConsumingEnumerable(), Processor.Order.Object);
}
public int QueueCount
{
get
{
return _orderQ.Count;
}
}
}
What i have works how it should, start the timer -> stop the timer -> trigger the process for collecting all the files within the folders -> process all the files -> restart the timer.
I cant help but think there is a better way to do what im doing especially when the number of queues that are going to be created for the accounts is 200 - 400.
Thanks
I think you don't need to stop and start your producers and consumers. The BlockingCollection
can block the producers if it reaches a maximum capacity and block the consumers if it is empty.
I'd also probably start of with a single BlockingCollection
, until profiling shows that I need another one. Depending on the relative speed of your producers and consumers, you may need to tweak their numbers. If they are IO bound, they should be asynchronous and you can have many, if they are CPU bound you probably won't need more than the number of processors available.
I've redone your example assuming IO bound producers and consumers, hope it gives you some ideas. It fires off the producers on 10 second intervals and can keep going until you cancel production via the CanellationToken
. Only after you have cancelled and completed the production do you CompleteAdding
to release the blocked consumers.
public class QueueVariables
{
public string Account {get;set;}
public string File {get;set;}
}
public static ConcurrentQueue<string> GetACcounts()
{
return new ConcurrentQueue<string>(new []
{
"Account1",
"Account2",
"Account3",
"Account4",
"Account5",
"Account6",
"Account7",
"Account8",
"Account9",
"Account10",
"Account11",
"Account12",
});
}
public static List<string> GetFiles(string acct)
{
return new List<string>
{
"File1",
"File2",
"File3",
"File4",
"File5",
"File6",
"File7",
"File8",
"File9",
"File10",
"File11",
"File12",
};
}
public static async Task StartPeriodicProducers(int numProducers, TimeSpan period, CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
var producers = StartProducers(numProducers, ct);
// wait for production to finish
await Task.WhenAll(producers.ToArray());
// wait before running again
Console.WriteLine("***Waiting " + period);
await Task.Delay(period, ct);
}
}
public static List<Task> StartProducers(int numProducers, CancellationToken ct)
{
List<Task> producingTasks = new List<Task>();
var accounts = GetACcounts();
for (int i = 0; i < numProducers; i++)
{
producingTasks.Add(Task.Run(async () =>
{
string acct;
while(accounts.TryDequeue(out acct) && !ct.IsCancellationRequested)
{
foreach (var file in GetFiles(acct))
{
_orderQ.Add(new UserQuery.QueueVariables{ Account = acct, File = file });
Console.WriteLine("Produced Account:{0} File:{1}", acct, file);
await Task.Delay(50, ct); // simulate production delay
}
}
Console.WriteLine("Finished producing");
}));
}
return producingTasks;
}
public static List<Task> StartConsumers(int numConsumers)
{
List<Task> consumingTasks = new List<Task>();
for (int j = 0; j < numConsumers; j++)
{
consumingTasks.Add(Task.Run(async () =>
{
try
{
while(true)
{
var queueVar = _orderQ.Take();
Console.WriteLine("Consumed Account:{0} File:{1}", queueVar.Account, queueVar.File);
await Task.Delay(200); // simulate consumption delay
}
}
catch(InvalidOperationException)
{
Console.WriteLine("Finished Consuming");
}
}));
}
return consumingTasks;
}
private static async Task MainAsync()
{
CancellationTokenSource cts = new CancellationTokenSource();
var periodicProducers = StartPeriodicProducers(2, TimeSpan.FromSeconds(10), cts.Token);
var consumingTasks = StartConsumers(4);
await Task.Delay(TimeSpan.FromSeconds(120));
// stop production
cts.Cancel();
try
{
// wait for producers to finish producing
await periodicProducers;
}
catch(OperationCanceledException)
{
// operation was cancelled
}
// complete adding to release blocked consumers
_orderQ.CompleteAdding();
// wait for consumers to finish consuming
await Task.WhenAll(consumingTasks.ToArray());
}
// maximum size 10, after that capaicity is reached the producers block
private static BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>(10);
void Main()
{
MainAsync().Wait();
Console.ReadLine();
}
// Define other methods and classes here