Search code examples
c#sql-serverazurestockquotes

How to process a high speed stream of data that only requires the "last" value for a symbol in a database C#


I have have a high speed stream of stock prices coming from a vendor... maybe 5000 per second. (about 8000 different symbols)

I have a table (SymbolPrice) in my database that needs to be updated with the most recent last price.

I don't seem to be able to keep the database updates fast enough to process the queue of last prices.

I am on an Azure Sql Server database, so I was able to upgrade the database to a premium version that supports In-Memory tables and made my SymbolPrice table an In-Memory table... but still not good enough.

If it ends up skipping a price, this is not a problem, as long as the most recent price gets in there as quick as possible... so if I get blasted with 10 in a row... only the last needs to be written... this sounds easy, except the 10 in a row might intermixed with other symbols.

So, my current solution is to use a ConcurrentDictionary to hold only the most recent price. And use a queue of Symbols to push updates to the database (see code below)... but this still isn't fast enough.

One way to solve this would be to simply repeatedly do a pass through the whole dictionary... and update the database with the most recent price... but this is a little bit of a waste as I would also be updating values that might only be updating every few minutes at the same rate as values that update many times a second.

Any thoughts on how this can be done better?

Thanks!

  • Brian

      public ConcurrentDictionary<string, QuoddLastPriceCache.PriceData> _lastPrices = new ConcurrentDictionary<string, QuoddLastPriceCache.PriceData>();
      public ConcurrentQueue<string> _lastPriceSymbolsToUpdate = new ConcurrentQueue<string>();
    
    
      public void Start()
      {
          Task.Run(() => { UpdateLastPricesTask(services); });
    
          lastPriceCache.PriceReceived += (symbol, priceData) =>
          {
              _lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
              _lastPriceSymbolsToUpdate.Enqueue(symbol);
          };
      }
    
      private void UpdateLastPricesTask(IServiceProvider services)
      {
          _lastPriceUpdatesRunning = true;
    
          while (_lastPriceUpdatesRunning)
          {
              if (_lastPriceSymbolsToUpdate.TryDequeue(out string symbol))
              {
                  if (_lastPrices.TryRemove(symbol, out QuoddLastPriceCache.PriceData priceData))
                  {
                      // write to database
                      if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
                      {
                          if (_lastPriceScope != null)
                              _lastPriceScope.Dispose();
                          _lastPriceScope = services.CreateScope();
                      }
    
                      var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
                      unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
                  }
              }
              else
                  Thread.Sleep(1);
          }
      }
    

Solution

  • The best I could do was the following approach... where I keep the last values in a dictionary and add a flag on whether it's been written to the database... and then make a pass through the data and write updated values to the database... that way I only update the most recently updated values. Works pretty well... seems like there should be a better way though.

        public void Start()
        {
            Task.Run(() => { UpdateLastPricesTask(services); });
            
            LastPriceCache.PriceReceived += (symbol, priceData) =>
            {
                _lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
            };
        }
    
        public ConcurrentDictionary<string, PriceData> _lastPrices = new ConcurrentDictionary<string, PriceData>();
        public bool _lastPriceUpdatesRunning;
        public DateTime _lastScopeCreate = DateTime.MinValue;
        public IServiceScope _lastPriceScope = null;
    
        private void UpdateLastPricesTask(IServiceProvider services)
        {
            _lastPriceUpdatesRunning = true;
    
            while (_lastPriceUpdatesRunning)
            {
                var processed = 0;
    
                foreach (var symbol in _lastPrices.Keys)
                {
                    if (_lastPrices.TryGetValue(symbol, out QuoddLastPriceCache.PriceData priceData))
                    {
                        if (priceData.WrittenToDatabase == false)
                        {
                            // create a new scope every 5 minutes
                            if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
                            {
                                if (_lastPriceScope != null)
                                    _lastPriceScope.Dispose();
                                _lastPriceScope = services.CreateScope();
                            }
    
                            // write to database
                            var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
                            unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
                            priceData.WrittenToDatabase = true;
    
                            processed++;
                        }
                    }
                }
    
                if (processed > 0)
                    Thread.Sleep(1);
                else
                    Thread.Sleep(1000 * 1);
            }
        }