Search code examples
c#rx.net

rx.net locking up from use of ToEnumerable


I am trying to convert the below statement so that I can get the key alongside the selected list:

var feed = new Subject<TradeExecuted>();

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
  .SelectMany(x => x.ToList())
  .Select(trades => Observable.FromAsync(() => Mediator.Publish(trades, cts.Token)))
  .Concat() // Ensure that the results are serialized.
  .Subscribe(cts.Token); // Check status of calls.

The above works, whereas the below does not - when I try and itterate over the list, it locks up.

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
  .Select(x => Observable.FromAsync(() =>
  {
      var list = x.ToEnumerable(); // <---- LOCK UP if we use list.First() etc
      var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list);
      return Mediator.Publish(aggregate, cts.Token);
  }))
  .Concat()
  .Subscribe(cts.Token); // Check status of calls.

I am clearly doing something wrong and probably horrific!

Going back to the original code, how can I get the Key alongside the enumerable list (and avoiding the hack below)?


As a sidenote, the below code works but it a nasty hack where I get the keys from the first list item:

feed
  .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
  .SelectMany(x => x.ToList())
  .Select(trades => Observable.FromAsync(() =>
  {
      var firstTrade = trades.First();
      var aggregate = AggregateTrades(firstTrade.Execution.Contract.Symbol, firstTrade.Execution.AccountId, firstTrade.Tenant, trades);
      return Mediator.Publish(aggregate, cts.Token);
  }))
  .Concat() // Ensure that the results are serialized.
  .Subscribe(cts.Token); // Check status of calls.

Solution

  • All versions of your code suffer from trying to eagerly evaluate the grouped sub-observable. Since in v1 and v3 your group observable will run a maximum of 5 seconds, that isn't horrible/awful, but it's still not great. In v2, I don't know what timespan is, but assuming it's 5 seconds, you have the same problem: Trying to turn the grouped sub-observable into a list or an enumerable means waiting for the sub-observable to complete, blocking the thread (or the task).

    You can fix this by using the Buffer operator to lazily evaluate the grouped sub-observable:

    var timespan = TimeSpan.FromSeconds(5);
    feed
      .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(timespan))
      .SelectMany(x => x
        .Buffer(timespan)
        .Select(list => Observable.FromAsync(() => 
        {
          var aggregate = AggregateTrades(x.Key.Symbol, x.Key.AccountId, x.Key.Tenant, list));
          return Mediator.Publish(aggregate, cts.Token);
        }))
      )
      .Concat()  // Ensure that the results are serialized.
      .Subscribe(cts.Token); // Check status of calls.
    

    This essentially means that until timespan is up, the items in the group by gather in a list inside Buffer. Once timespan is up, they're released as a list, and the mediator publish happens.