I have have a high speed stream of stock prices coming from a vendor... maybe 5000 per second. (about 8000 different symbols)
I have a table (SymbolPrice) in my database that needs to be updated with the most recent last price.
I don't seem to be able to keep the database updates fast enough to process the queue of last prices.
I am on an Azure Sql Server database, so I was able to upgrade the database to a premium version that supports In-Memory tables and made my SymbolPrice table an In-Memory table... but still not good enough.
If it ends up skipping a price, this is not a problem, as long as the most recent price gets in there as quick as possible... so if I get blasted with 10 in a row... only the last needs to be written... this sounds easy, except the 10 in a row might intermixed with other symbols.
So, my current solution is to use a ConcurrentDictionary to hold only the most recent price. And use a queue of Symbols to push updates to the database (see code below)... but this still isn't fast enough.
One way to solve this would be to simply repeatedly do a pass through the whole dictionary... and update the database with the most recent price... but this is a little bit of a waste as I would also be updating values that might only be updating every few minutes at the same rate as values that update many times a second.
Any thoughts on how this can be done better?
Thanks!
Brian
public ConcurrentDictionary<string, QuoddLastPriceCache.PriceData> _lastPrices = new ConcurrentDictionary<string, QuoddLastPriceCache.PriceData>();
public ConcurrentQueue<string> _lastPriceSymbolsToUpdate = new ConcurrentQueue<string>();
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
lastPriceCache.PriceReceived += (symbol, priceData) =>
{
_lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
_lastPriceSymbolsToUpdate.Enqueue(symbol);
};
}
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
if (_lastPriceSymbolsToUpdate.TryDequeue(out string symbol))
{
if (_lastPrices.TryRemove(symbol, out QuoddLastPriceCache.PriceData priceData))
{
// write to database
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
}
}
else
Thread.Sleep(1);
}
}
The best I could do was the following approach... where I keep the last values in a dictionary and add a flag on whether it's been written to the database... and then make a pass through the data and write updated values to the database... that way I only update the most recently updated values. Works pretty well... seems like there should be a better way though.
public void Start()
{
Task.Run(() => { UpdateLastPricesTask(services); });
LastPriceCache.PriceReceived += (symbol, priceData) =>
{
_lastPrices.AddOrUpdate(symbol, priceData, (key, value) => { return priceData; });
};
}
public ConcurrentDictionary<string, PriceData> _lastPrices = new ConcurrentDictionary<string, PriceData>();
public bool _lastPriceUpdatesRunning;
public DateTime _lastScopeCreate = DateTime.MinValue;
public IServiceScope _lastPriceScope = null;
private void UpdateLastPricesTask(IServiceProvider services)
{
_lastPriceUpdatesRunning = true;
while (_lastPriceUpdatesRunning)
{
var processed = 0;
foreach (var symbol in _lastPrices.Keys)
{
if (_lastPrices.TryGetValue(symbol, out QuoddLastPriceCache.PriceData priceData))
{
if (priceData.WrittenToDatabase == false)
{
// create a new scope every 5 minutes
if (_lastPriceScope == null || _lastScopeCreate + TimeSpan.FromSeconds(60 * 5) < DateTime.UtcNow)
{
if (_lastPriceScope != null)
_lastPriceScope.Dispose();
_lastPriceScope = services.CreateScope();
}
// write to database
var unitOfWork = _lastPriceScope.ServiceProvider.GetRequiredService<IUnitOfWork>();
unitOfWork.SymbolPrice.UpdateLastPrice(symbol, priceData.Price, priceData.Timestamp);
priceData.WrittenToDatabase = true;
processed++;
}
}
}
if (processed > 0)
Thread.Sleep(1);
else
Thread.Sleep(1000 * 1);
}
}