Upd thanks to Matthew Watson for noticing and note that I plan to port my code to c++-linux so I prefer "platform-independent" code
My trading application is almost lock-free. The code below is the only place where I do use locks. Let me start with the code, it's pretty long but don't worry there are a lot of repeating parts so it's simple. I prefer to add all "repeating" parts to better demonstrate how my things work:
Task.Factory.StartNew(() =>
{
while (true)
{
Iterate();
}
}, TaskCreationOptions.LongRunning);
private void Iterate()
{
bool marketDataUpdated = false;
lock (ordersToRegisterLock)
{
if (ordersToRegister.Count > 0)
{
marketDataUpdated = true;
while (ordersToRegister.Count > 0)
{
Order order = ordersToRegister.Dequeue();
// Stage1, Process
}
}
}
lock (aggrUpdatesLock)
{
if (aggrUpdates.Count > 0)
{
marketDataUpdated = true;
while (!aggrUpdates.IsNullOrEmpty())
{
var entry = aggrUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (commonUpdatesLock)
{
if (commonUpdates.Count > 0)
{
marketDataUpdated = true;
while (!commonUpdates.IsNullOrEmpty())
{
var entry = commonUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (infoUpdatesLock)
{
if (infoUpdates.Count > 0)
{
marketDataUpdated = true;
while (!infoUpdates.IsNullOrEmpty())
{
var entry = infoUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (tradeUpdatesLock)
{
if (tradeUpdates.Count > 0)
{
marketDataUpdated = true;
while (!tradeUpdates.IsNullOrEmpty())
{
var entry = tradeUpdates.Dequeue();
// Stage1, Process
}
}
}
if (marketDataUpdated)
{
// Stage2 !
// make a lot of work. expensive operation. recalculate strategies, place orders etc.
}
}
private readonly Queue<Order> ordersToRegister = new Queue<Order>();
private readonly object ordersToRegisterLock = new object();
private readonly Queue<AggrEntry> aggrUpdates = new Queue<AggrEntry>();
private readonly object aggrUpdatesLock = new object();
private readonly Queue<CommonEntry> commonUpdates = new Queue<CommonEntry>();
private readonly object commonUpdatesLock = new object();
private readonly Queue<InfoEntry> infoUpdates = new Queue<InfoEntry>();
private readonly object infoUpdatesLock = new object();
private readonly Queue<TradeEntry> tradeUpdates = new Queue<TradeEntry>();
private readonly object tradeUpdatesLock = new object();
public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
{
lock (ordersToRegisterLock)
{
ordersToRegister.Enqueue(e.order);
}
}
public void TradeUpdated(object sender, Gate.TradeArgs e)
{
lock (tradeUpdatesLock)
{
foreach (var entry in e.entries)
{
tradeUpdates.Enqueue(entry);
}
}
}
public void InfoUpdated(object sender, Gate.InfoArgs e)
{
lock (infoUpdatesLock)
{
foreach (var entry in e.entries)
{
infoUpdates.Enqueue(entry);
}
}
}
public void CommonUpdated(object sender, Gate.CommonArgs e)
{
lock (commonUpdatesLock)
{
foreach (var entry in e.entries)
{
commonUpdates.Enqueue(entry);
}
}
}
public void AggrUpdated(object sender, Gate.AggrArgs e)
{
lock (aggrUpdatesLock)
{
foreach (var entry in e.entries)
{
aggrUpdates.Enqueue(entry);
}
}
}
In my code I have two stages. Stage1
is update stage and Stage2
is working stage. I need to switch between these two stages as fast as possible, like that:
Stage2
Stage2
Stage2
In Stage2
I should not update, but should keep "collecting" updates so I can apply they later.
And important thing - this is very latency-critical code so I agree to "spent" one core for having minimal latency! So when any update occure I need to process it asap and perform Stage2
.
So I hope now it's clear what I need to achieve and it's clear how I have implemented that. Now it's time to discuss how good my code is. I do see several potential problems:
Any suggestions how to improve what I wrote are welcome, thanks!
upd partly solved - as I understand I better to replace queries to lock-free (likely ring-buffer based?) queries.. i think i will use c++ version of disruptor later. Also I've used this article http://www.umbraworks.net/bl0g/rebuildall/2010/03/08/Running_NET_threads_on_selected_processor_cores and replaced Task with a Thread running on the "fixed" core, however i'm still using "busy-spin", probably I should use something smarter?
With the code below, you are no longer locked during "stage 1" processing:
Task.Factory.StartNew(() =>
{
while (true)
{
Iterate();
}
}, TaskCreationOptions.LongRunning);
private void Iterate()
{
bool marketDataUpdated = false;
foreach (Order order in ordersToRegister)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in aggrUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in commonUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in infoUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in tradeUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
if (marketDataUpdated)
{
// Stage2 !
// make a lot of work. expensive operation. recalculate strategies, place orders etc.
}
}
private readonly ConcurrentQueue<Order> ordersToRegister = new ConcurrentQueue<Order>();
private readonly ConcurrentQueue<AggrEntry> aggrUpdates = new ConcurrentQueue<AggrEntry>();
private readonly ConcurrentQueue<CommonEntry> commonUpdates = new ConcurrentQueue<CommonEntry>();
private readonly ConcurrentQueue<InfoEntry> infoUpdates = new ConcurrentQueue<InfoEntry>();
private readonly ConcurrentQueue<TradeEntry> tradeUpdates = new ConcurrentQueue<TradeEntry>();
public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
{
ordersToRegister.Enqueue(e.order);
}
public void TradeUpdated(object sender, Gate.TradeArgs e)
{
foreach (var entry in e.entries)
{
tradeUpdates.Enqueue(entry);
}
}
public void InfoUpdated(object sender, Gate.InfoArgs e)
{
foreach (var entry in e.entries)
{
infoUpdates.Enqueue(entry);
}
}
public void CommonUpdated(object sender, Gate.CommonArgs e)
{
foreach (var entry in e.entries)
{
commonUpdates.Enqueue(entry);
}
}
public void AggrUpdated(object sender, Gate.AggrArgs e)
{
foreach (var entry in e.entries)
{
aggrUpdates.Enqueue(entry);
}
}