Search code examples
c#multithreadingproducer-consumer

Producer producing data every one second and Consumer consuming after every minute


I am trying to write a program using BlockingCollection producer/ consumer pattern where producer will keep on producing data every one second and consumer consuming it and displaying some processed data from my Producer data on console window every 60 seconds.

The real life scenario is - I will be getting Open High Low Close stock data in the producer every second which I would like to pass to consumer threads which would create 1 minute OHLC data from 60 seconds data which I will receive from third party. For simulating real life scenario, I am trying to create a producer which would have timer which would keep on putting data in BlockingCollection every second but not sure how to use timer in the producer. Can anyone suggest with code

  1. How would I create Producer which would keep on producing OHLC data every second using timer inside the producer. Lets say my OHLC value would have counter which I will be using for counting second. For example O=1,H=1,L=1,C=1 O=2,H=2,L=2,C=2 .... O=60,H=60,L=60,C=60

As soon as the counter reaches multiple of 60, my Consumer should kick-in which would take that 60 second data to create one minute OHLC data. 2. Where would CompleteAdding method be written as my Producer is never ending Producer which will only be stopped once I stop entire application

Any help in this regard is greatly appreciated. Thanks in advance


Solution

  • In this example two tasks are created: producer and consumer. In the first, data is generated every second and placed in the collection. In the second, the data is extracted from the collection and processed every minute.

    var produced = new BlockingCollection<Price>();
    
    var producer = Task.Run(async () =>
    {
        try
        {
            var random = new Random();
            while (true)
            {
                produced.Add(new Price { Low = random.Next(500), High = random.Next(500, 1000) });
                await Task.Delay(1000);
            }
        }
        finally
        {
            produced.CompleteAdding();
        }
    });
    
    var consumer = Task.Run(async () =>
    {
        const int interval = 60; // seconds
        var values = new List<Price>();
    
        foreach (var value in produced.GetConsumingEnumerable())
        {
            values.Add(value);
    
            if (DateTime.UtcNow.Second % interval == 0)
            {
                Console.WriteLine(values.Average(p => p.High)); // do some work
                values.Clear();
            }
        }
    });
    
    Task.WaitAll(producer, consumer);