I am trying to convert the below statement so that I can get the key alongside the selected list:
var feed = new Subject<TradeExecuted>();
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() => Mediator.Publish(trades, cts.Token)))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
The above works, whereas the below does not - when I try and itterate over the list, it locks up.
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.Select(x => Observable.FromAsync(() =>
{
var list = x.ToEnumerable(); // <---- LOCK UP if we use list.First() etc
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat()
.Subscribe(cts.Token); // Check status of calls.
I am clearly doing something wrong and probably horrific!
Going back to the original code, how can I get the Key alongside the enumerable list (and avoiding the hack below)?
As a sidenote, the below code works but it a nasty hack where I get the keys from the first list item:
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(x => x.ToList())
.Select(trades => Observable.FromAsync(() =>
{
var firstTrade = trades.First();
var aggregate = AggregateTrades(firstTrade.Execution.Contract.Symbol, firstTrade.Execution.AccountId, firstTrade.Tenant, trades);
return Mediator.Publish(aggregate, cts.Token);
}))
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
All versions of your code suffer from trying to eagerly evaluate the grouped sub-observable. Since in v1 and v3 your group observable will run a maximum of 5 seconds, that isn't horrible/awful, but it's still not great. In v2, I don't know what timespan
is, but assuming it's 5 seconds, you have the same problem: Trying to turn the grouped sub-observable into a list or an enumerable means waiting for the sub-observable to complete, blocking the thread (or the task).
You can fix this by using the Buffer
operator to lazily evaluate the grouped sub-observable:
var timespan = TimeSpan.FromSeconds(5);
feed
.GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
.SelectMany(x => x
.Buffer(timespan)
.Select(list => Observable.FromAsync(() =>
{
var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list));
return Mediator.Publish(aggregate, cts.Token);
}))
)
.Concat() // Ensure that the results are serialized.
.Subscribe(cts.Token); // Check status of calls.
This essentially means that until timespan
is up, the items in the group by gather in a list inside Buffer
. Once timespan
is up, they're released as a list, and the mediator publish happens.