Search code examples
c#rx.net

How to Aggregate with GroupBy and wait for results


Say, for example, I'm keeping track of the distance each player has travelled for a cricket roster. I might have the following objects

  • Trip (IList legs)
  • Leg (distance, duration, player and belongs to trip)
  • Player (belongs to team)
  • Team

I would like to aggregate this data using Reactive Extensions. Here's my first attempt:

var trips = new List<Trip>();

Observable.Return( trips )
  .SelectMany( trips => trips )
  .SelectMany( trip => trip.legs )
  .GroupBy( leg => leg.player.team )
  .Select( teamLegs => {
    var teamSummary = new {
      team = teamLegs.key,
      distance = 0M,
      duration = 0M
    }

    teamLegs.Sum( x => x.distance ).Subscribe( x => { teamSummary.distance = x; } )
    teamLegs.Sum( x => x.duration ).Subscribe( x => { teamSummary.duration = x; } )

    return teamSummary;
  })
  .Select(teamSummary => {
      // If I try to do something with teamSummary.distance or duration - the above
      // sum is yet to be completed 
  })

  // ToList will make the above sums work, but only if there's only 1 Select statement above
  .ToList()

  .Subscribe(teamSummaries => {
  });

How can I make sure that the sums are completed before the second Select() statement?


Solution

  • An observable is awaitable. If you await it, it will wait for the sequence to complete, and return the last item.

    So what you can do, is to await the result, instead of subscribing. This way the block inside the first Select, will only return, once the results are ready.

    .Select(async teamLegs =>
        new {
            team = teamLegs.key,
            distance = await teamLegs.Sum(x => x.distance),
            duration = await teamLegs.Sum(x => x.duration)
            })
    ...
    

    The Select statement will return IObservable<Task<(type of teamSummary)> so you can use SelectMany(...) instead, to get IObservable<(type of teamSummary)>.