Search code examples
c#multithreadingblockingcollection

Creating a file pickup process with a Blocking Collection


What i have got at the moment is a timer that fires every 5000 ms:

static Timer _aTimer = new System.Timers.Timer();

    static void Main(string[] args)
    {
        _aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent);

        _aTimer.Interval = 5000;
        _aTimer.Enabled = true;

        Console.WriteLine("Press \'q\' to quit the sample.");
        while (Console.Read() != 'q') ;

    }

On fire it then sets up the queues for processing the files:

        private static void OnTimedEvent(object source, ElapsedEventArgs e)
    {
        // stop the timer so we dont reprocess files we already have in the queue
        StopTimer();

        // setup a list of queues
        var lists = new List<IncomingOrderQueue>();
        //get the accounts in which the files we are looking in
        var accounts = new List<string>() { "Account1", "Account2" };
        //loop through the accounts and set up the queue 
        foreach (var acc in accounts)
        {
            // create the queue
            var tmp = new IncomingOrderQueue();
            // for each file in the folders add it to be processed in the queue
            foreach (var orderFile in OrderFiles(acc))
            {
                tmp.EnqueueSweep(new QueueVariables() { Account = acc, File = orderFile });
            }
            // add the queue to the list of queues
            lists.Add(tmp);
        }
        // for each of the queues consume all the contents of them
        Parallel.ForEach(lists, l => l.Consume());

        //start the timer back up again because we have finished all the files we have in the current queue
        StartTimer();
    }

        public static void StopTimer()
    {
        Console.WriteLine("Stop Timer");
        _aTimer.Stop();
        _aTimer.Enabled = false;
    }

    public static void StartTimer()
    {
        Console.WriteLine("Start Timer");
        _aTimer.Enabled = true;
        _aTimer.Start();
    }

The Blocking Queue its self:

 public class IncomingOrderQueue 
{
    BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>();

    public void EnqueueSweep(QueueVariables incoming)
    {
        // add items to the queue
        _orderQ.Add(incoming);
    }

    public void Consume()
    {
        // stop anything been adding to the queue
        _orderQ.CompleteAdding();
        // consume all the objects in the blocking collection
        Parallel.ForEach(_orderQ.GetConsumingEnumerable(), Processor.Order.Object);
    }

    public int QueueCount
    {
        get
        {
            return _orderQ.Count;
        }
    }
}

What i have works how it should, start the timer -> stop the timer -> trigger the process for collecting all the files within the folders -> process all the files -> restart the timer.

I cant help but think there is a better way to do what im doing especially when the number of queues that are going to be created for the accounts is 200 - 400.

Thanks


Solution

  • I think you don't need to stop and start your producers and consumers. The BlockingCollection can block the producers if it reaches a maximum capacity and block the consumers if it is empty.

    I'd also probably start of with a single BlockingCollection, until profiling shows that I need another one. Depending on the relative speed of your producers and consumers, you may need to tweak their numbers. If they are IO bound, they should be asynchronous and you can have many, if they are CPU bound you probably won't need more than the number of processors available.

    I've redone your example assuming IO bound producers and consumers, hope it gives you some ideas. It fires off the producers on 10 second intervals and can keep going until you cancel production via the CanellationToken. Only after you have cancelled and completed the production do you CompleteAdding to release the blocked consumers.

    public class QueueVariables
    {
        public string Account {get;set;}
        public string File {get;set;}
    }
    
    public static ConcurrentQueue<string> GetACcounts()
    {
        return new ConcurrentQueue<string>(new []
            {
            "Account1",
            "Account2",
            "Account3",
            "Account4",
            "Account5",
            "Account6",
            "Account7",
            "Account8",
            "Account9",
            "Account10",
            "Account11",
            "Account12",
        });
    }
    
    public static List<string> GetFiles(string acct)
    {
        return new List<string>
        {
            "File1",
            "File2",
            "File3",
            "File4",
            "File5",
            "File6",
            "File7",
            "File8",
            "File9",
            "File10",
            "File11",
            "File12",
        };
    }
    
    public static async Task StartPeriodicProducers(int numProducers, TimeSpan period, CancellationToken ct)
    {
        while(!ct.IsCancellationRequested)
        {
            var producers = StartProducers(numProducers, ct);
    
            // wait for production to finish
            await Task.WhenAll(producers.ToArray());
    
            // wait before running again
            Console.WriteLine("***Waiting " + period);
            await Task.Delay(period, ct);
        }
    }
    
    public static List<Task> StartProducers(int numProducers, CancellationToken ct)
    {
        List<Task> producingTasks = new List<Task>();
        var accounts = GetACcounts();
    
        for (int i = 0; i < numProducers; i++)
        {
            producingTasks.Add(Task.Run(async () =>
            {
                string acct;
                while(accounts.TryDequeue(out acct) && !ct.IsCancellationRequested)
                {
                    foreach (var file in GetFiles(acct))
                    {
                        _orderQ.Add(new UserQuery.QueueVariables{ Account = acct, File = file });
                        Console.WriteLine("Produced Account:{0} File:{1}", acct, file);
                        await Task.Delay(50, ct); // simulate production delay
                    }
                }
    
                Console.WriteLine("Finished producing");
            }));
        }
    
        return producingTasks;
    }
    
    public static List<Task> StartConsumers(int numConsumers)
    {
        List<Task> consumingTasks = new List<Task>();
    
        for (int j = 0; j < numConsumers; j++)
        {
            consumingTasks.Add(Task.Run(async () =>
            {
                try
                {
                    while(true)
                    {
                        var queueVar = _orderQ.Take();
                        Console.WriteLine("Consumed Account:{0} File:{1}", queueVar.Account, queueVar.File);
                        await Task.Delay(200); // simulate consumption delay
                    }
                }
                catch(InvalidOperationException)
                {
                    Console.WriteLine("Finished Consuming");
                }
            }));
        }
    
        return consumingTasks;
    }
    
    private static async Task MainAsync()
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        var periodicProducers = StartPeriodicProducers(2, TimeSpan.FromSeconds(10), cts.Token);
        var consumingTasks = StartConsumers(4);
    
        await Task.Delay(TimeSpan.FromSeconds(120));
    
        // stop production
        cts.Cancel();
    
        try
        {
            // wait for producers to finish producing
            await periodicProducers;
        }
        catch(OperationCanceledException)
        {
            // operation was cancelled
        }
    
        // complete adding to release blocked consumers
        _orderQ.CompleteAdding();
    
        // wait for consumers to finish consuming
        await Task.WhenAll(consumingTasks.ToArray());
    }
    
    // maximum size 10, after that capaicity is reached the producers block
    private static BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>(10);
    
    void Main()
    {
        MainAsync().Wait();
        Console.ReadLine();
    }
    
    // Define other methods and classes here