I have the following class below that is used to perform trade operations for buying and selling subscriptions. I would like convert this class so it can be used within a micro-service using event sourcing and possibly CQRS. The idea I had in my mind is this would live inside of a Service Fabric Actor where this class would be entirely in memory.
public class OrderBook
{
public const int ScaleFactor = 10_000;
private long _orderId = 1;
public OrderBook()
{
Limits = new RankedSet<Limit>(new LimitPriceComparer()) { new Limit { Price = 1 * ScaleFactor } };
Subscriptions = new Dictionary<long, Subscription>();
Orders = new Dictionary<long, Order>();
}
private RankedSet<Limit> Limits { get; }
private IDictionary<long, Subscription> Subscriptions { get; }
private IDictionary<long, Order> Orders { get; }
public Order Ask(long userId, long price, int shares)
{
if (userId <= 0 || price <= 0 || shares <= 0)
{
// TODO: Return a message or something.
return null;
}
price = price * ScaleFactor;
// Get the users subscription.
if (!Subscriptions.TryGetValue(userId, out Subscription subscription))
{
// TODO: Return a message or something.
return null;
}
var index = Limits.Count - 1;
var originalShares = shares;
while (index >= 0 && shares > 0)
{
var currentLimit = Limits.ElementAt(index);
if (currentLimit.Price < price)
{
break;
}
Order order = currentLimit.BidHead;
while (order != null && shares > 0)
{
if (order.Subscription.UserId == userId)
{
if (order.Next == null)
{
break;
}
else
{
order = order.Next;
}
}
// Always assume the bid will have a subscription even if it's empty.
if (order.Shares >= shares)
{
order.Subscription.Owned += shares;
order.Shares -= shares;
shares = 0;
}
else
{
order.Subscription.Owned += order.Shares;
shares -= order.Shares;
order.Shares = 0;
}
order = order.Next;
}
index--;
}
if (shares > 0)
{
subscription.Owned -= originalShares - shares;
var newOrder = new Order { Id = /*Interlocked.Increment(ref _orderId)*/_orderId++, Shares = shares, Subscription = subscription };
// At this point Limits is guaranteed to have a single Limit.
var prevLimit = Limits.ElementAt(index == Limits.Count - 1 ? index : ++index);
if (prevLimit.Price == price)
{
newOrder.ParentLimit = prevLimit;
if (prevLimit.AskHead == null)
{
prevLimit.AskHead = newOrder;
}
else
{
newOrder.Next = prevLimit.AskHead;
prevLimit.AskHead.Prev = newOrder;
prevLimit.AskHead = newOrder;
}
}
else
{
var newLimit = new Limit { AskHead = newOrder, Price = price };
newOrder.ParentLimit = newLimit;
Limits.Add(newLimit);
}
Orders.Add(newOrder.Id, newOrder);
return newOrder;
}
else
{
subscription.Owned -= originalShares;
}
return null;
}
}
Here's a start on what I think a conversion to an aggregate would look like. The trouble I'm running into is when a TradeExecutedEvent is raised it needs to modify the state of the aggregate as a whole. In other words if that event was fired by itself it wouldn't make sense since it's dependent on events that come before it. The only reason I thought I need a TradeExecutedEvent is to notify on the UI that their trade has been executed.
Would it be better to still store the TradeExecutedEvent in Event Store but just not have a corresponding Apply method for it so other services/subscribers can be notified that a trade happened?
It feels to me I have thought about this entirely wrong since think aggregates are suppose to be transient and not long lived like this one would be. I would appreciate any suggestions or guidance.
public class TradeAggregate : AggregateBase
{
private const int ScaleFactor = 10_000;
private RankedSet<Limit> Limits { get; }
private IDictionary<long, Subscription> Subscriptions { get; }
private IDictionary<long, Order> Orders { get; }
public TradeAggregate(string asset)
{
Limits = new RankedSet<Limit>(new LimitPriceComparer()) { new Limit { Price = 1 * ScaleFactor } };
Subscriptions = new Dictionary<long, Subscription>();
Orders = new Dictionary<long, Order>();
}
public void Ask(long userId, long price, int shares)
{
if (userId <= 0 || price <= 0 || shares <= 0)
{
// TODO: Return a message or something.
return;
}
price = price * ScaleFactor;
if (!Subscriptions.TryGetValue(userId, out Subscription subscription))
{
throw new System.Exception("You do not own this subscription.");
}
RaiseEvent(new AskOrderPlacedEvent(subscription, price, shares));
}
public void Apply(AskOrderPlacedEvent e)
{
var index = Limits.Count - 1;
var shares = e.Shares;
while (index >= 0 && shares > 0)
{
var currentLimit = Limits.ElementAt(index);
if (currentLimit.Price < e.Price)
{
break;
}
Order order = currentLimit.BidHead;
while (order != null && shares > 0)
{
if (order.Subscription.UserId == e.Subscription.UserId)
{
if (order.Next == null)
{
break;
}
else
{
order = order.Next;
}
}
// Always assume the bid will have a subscription even if it's empty.
if (order.Shares >= shares)
{
RaiseEvent(new TradePartiallyExecutedEvent(order, shares, e.Subscription, e.Shares));
shares = 0;
}
else
{
RaiseEvent(new TradeExecutedEvent(order, shares, e.Subscription, e.Shares));
shares -= order.Shares;
}
order = order.Next;
}
index--;
}
if (shares > 0)
{
// .... etc.
}
else
{
// .... etc.
}
}
public void Apply(TradePartiallyExecutedEvent e)
{
e.Order.Subscription.Owned += e.Shares;
e.Order.Shares -= e.Shares;
e.Subscription.Owned -= e.OriginalShares - e.Shares;
}
public void Apply(TradeExecutedEvent e)
{
e.Order.Subscription.Owned += e.Order.Shares;
e.Order.Shares = 0;
e.Subscription.Owned -= e.OriginalShares;
}
}
If I don't missunderstand you, what you are trying to do is that the aggregate reacts to an event raised by itself.
It has no sense. The aggregate is transactionally consistent, so that you can do all the operations in one transaction without using events.
Events are for eventual consistence between different aggregates of the same BC, or between two BCs, since in a transaction just the state of one aggregate is modified. The aggregate must inform to the rest of the world outside its transactional boundary that something has ocurred by raising an event.
I think that maybe you should take a look to your domain to check wether you must split the aggregate into several ones. If so, use events to communicate them asynchronously. Otherwise (having just one aggregate), you don't need listen to events, just raise them to store them in the event store.