Search code examples
c#multithreadingthread-safetyimmutable-collections

C# Immutable counter for multiple fields


I have a fairly high throughput on a message counter (tens of thousands per second), and looking for an efficient way of getting the count without putting locks everywhere or ideally not locking on each message count when i am giving an update every 10 seconds.

Use of immutable counter object

I am using an immutable counter class:

public class Counter
{
    public Counter(int quotes, int trades)
    {
        Quotes = quotes;
        Trades = trades;
    }

    readonly public int Quotes;
    readonly public int Trades;
    // and some other counter fields snipped
}

And would update this on each message process loop:

class MyProcessor
{
    System.Timers.Timer timer;
    Counter counter = new Counter(0,0);

    public MyProcessor()
    {
       // update ever 10 seconds
       this.timer = new System.Timers.Timer(10000);

       timer.Elapsed += (sender, e) => {
          var quotesPerSecond = this.counter.Quotes / 10.0;
          var tradesPerSecond = this.counter.Trades / 10.0;
          this.Counter = new Counter(0,0);
       });
    }

    public void ProcessMessages(Messages messages)
    {
       foreach(var message in messages) { /* */ }

       var oldCounter = counter;
       this.counter = new Counter(oldCounter.Quotes, oldCounter.Trades);   
    }
}

I have lots of counters (not all shown), so would mean a lot of individual Interlocked.Increment calls on individual counter fields.

The only other way I can think of is lock every single run of ProcessMessages (which will be extensive) and heavy for something which is a utility as opposed to critical where the program would crash.

Is it possible to use an immutable counter object in this fashion without hard interlocking/thread mechanisms when we only need to update once every 10 seconds?

Flag check idea to avoid locks

Could the timer thread set a flag for the ProcessMessages to check and if it sees it set, start the count from zero again, i.e.

/* snipped the MyProcessor class, same as before */

System.Timers.Timer timer;
Counter counter = new Counter(0,0);
ManualResetEvent reset = new ManualResetEvent(false);

public MyProcessor()
{
   // update ever 10 seconds
   this.timer = new System.Timers.Timer(10000);

   timer.Elapsed += (sender, e) => {
      var quotesPerSecond = this.counter.Quotes / 10.0;
      var tradesPerSecond = this.counter.Trades / 10.0;
      // log
      this.reset.Set();
   });
}

// this should be called every second with a heartbeat message posted to queue
public void ProcessMessages(Messages messages)
{
   if (reset.WaitOne(0) == true)
   {
      this.counter = new Counter(this.counter.Quotes, this.counter.Trades, this.counter.Aggregates);
      reset.Reset();
   }
   else
   {
      this.counter = new Counter(
                        this.counter.Quotes + message.Quotes.Count,
                        this.counter.Trades + message.Trades.Count);
   }
}

/* end of MyProcessor class */

This would work, however the update "stalls" when the process messages comes to a halt (although the throughput is very high, it does pause for a number of hours at night ideally should show the actual rather than last value).

One way around this would be to post a heartbeat message to the MyProcessor.ProcessMessages() every second to force an internal update of the message counters and subsequent reset when the reset ManualResetEvent is set.


Solution

  • I wanted to update everyone with what I had come up with, the counter updates were pushed within the thread itself.

    Everything is driven by the DequeueThread loop, and specifically this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds)) function.

    This will either return an item from the queue, process it and update the counters, or timeout and then update the counters - there are no other threads involved everything, including updating message rate, is done within the thread.

    In summary, nothing runs in parallel (in terms of dequing the packet), it is fetching the items one at a time and processing it and the counters thereafter. Then finally looping back to process the next item in the queue.

    This removes the need for synchronisation:

    internal class Counter
    {
        public Counter(Action<int,int,int,int> updateCallback, double updateEvery)
        {
            this.updateCallback = updateCallback;
            this.UpdateEvery = updateEvery;
        }
    
        public void Poll()
        {
            if (nextUpdate < DateTimeOffset.UtcNow)
            {
                // post the stats, and reset
                this.updateCallback(this.quotes, this.trades, this.aggregates, this.statuses);
                this.quotes = 0;
                this.trades = 0;
                this.aggregates = 0;
                this.statuses = 0;
                nextUpdate = DateTimeOffset.UtcNow.AddSeconds(this.UpdateEvery);
            }
        }
    
        public void AddQuotes(int count) => this.quotes += count;
        public void AddTrades(int count) => this.trades += count;
        public void AddAggregates(int count) => this.aggregates += count;
        public void AddStatuses(int count) => this.statuses += count;
    
        private int quotes;
        private int trades;
        private int aggregates;
        private int statuses;
    
        private readonly Action<int,int,int,int> updateCallback;
        public double UpdateEvery { get; private set; }
        private DateTimeOffset nextUpdate;
    }
    
    public class DeserializeWorker
    {
        private readonly BufferBlock<byte[]> queue = new BufferBlock<byte[]>();
        private readonly IPolygonDeserializer polygonDeserializer;
        private readonly ILogger<DeserializeWorker> logger;
    
        private readonly Counter counter; 
        const double UpdateFrequencySeconds = 5.0;        
        long maxBacklog = 0;
    
        public DeserializeWorker(IPolygonDeserializer polygonDeserializer, ILogger<DeserializeWorker> logger)
        {
            this.polygonDeserializer = polygonDeserializer ?? throw new ArgumentNullException(nameof(polygonDeserializer));
            this.logger = logger;
            this.counter = new Counter(ProcesCounterUpdateCallback, UpdateFrequencySeconds);
        }
    
        public void Add(byte[] data)
        {
            this.queue.Post(data);
        }
    
        public Task Run(CancellationToken stoppingToken)
        {
            return Task
                    .Factory
                    .StartNew(
                        async () => await DequeueThread(stoppingToken),
                        stoppingToken,
                        TaskCreationOptions.LongRunning,
                        TaskScheduler.Default)
                    .Unwrap();
        }
    
        private async Task DequeueThread(CancellationToken stoppingToken)
        {
            while (stoppingToken.IsCancellationRequested == false)
            {
                try
                {
                    var item = await this.queue.ReceiveAsync(TimeSpan.FromSeconds(UpdateFrequencySeconds), stoppingToken);
                    await ProcessAsync(item);
                }
                catch (TimeoutException)
                {
                    // this is ok, timeout expired 
                }
                catch(TaskCanceledException)
                {
                    break; // task cancelled, break from loop
                }
                catch (Exception e)
                {
                    this.logger.LogError(e.ToString());
                }
    
                UpdateCounters();
            }
    
            await StopAsync();
        }
    
    
        protected async Task StopAsync()
        {
            this.queue.Complete();
            await this.queue.Completion;
        }
    
        protected void ProcessStatuses(IEnumerable<Status> statuses)
        {
            Parallel.ForEach(statuses, (current) =>
            {
                if (current.Result != "success")
                    this.logger.LogInformation($"{current.Result}: {current.Message}");
            });
        }
    
        protected void ProcessMessages<T>(IEnumerable<T> messages)
        {
            Parallel.ForEach(messages, (current) =>
            {
                // serialize by type T
                // dispatch
            });
        }
    
        async Task ProcessAsync(byte[] item)
        {
            try
            {
                var memoryStream = new MemoryStream(item);
                var message = await this.polygonDeserializer.DeserializeAsync(memoryStream);
    
                var messagesTask = Task.Run(() => ProcessStatuses(message.Statuses));
                var quotesTask = Task.Run(() => ProcessMessages(message.Quotes));
                var tradesTask = Task.Run(() => ProcessMessages(message.Trades));
                var aggregatesTask = Task.Run(() => ProcessMessages(message.Aggregates));
    
                this.counter.AddStatuses(message.Statuses.Count);
                this.counter.AddQuotes(message.Quotes.Count);
                this.counter.AddTrades(message.Trades.Count);
                this.counter.AddAggregates(message.Aggregates.Count);
    
                Task.WaitAll(messagesTask, quotesTask, aggregatesTask, tradesTask);                                
            }
            catch (Exception e)
            {
                this.logger.LogError(e.ToString());
            }
        }
    
        void UpdateCounters()
        {
            var currentCount = this.queue.Count;
            if (currentCount > this.maxBacklog)
                this.maxBacklog = currentCount;
    
            this.counter.Poll();
        }
    
        void ProcesCounterUpdateCallback(int quotes, int trades, int aggregates, int statuses)
        {
            var updateFrequency = this.counter.UpdateEvery;
            logger.LogInformation(
                $"Queue current {this.queue.Count} (max {this.maxBacklog }), {quotes / updateFrequency} quotes/sec, {trades / updateFrequency} trades/sec, {aggregates / updateFrequency} aggregates/sec, {statuses / updateFrequency} status/sec");
        }
    }