Search code examples
c#system.reactiveobservablereactiveuiunfold

Reactive Extensions unfold / scan approach for nested hierarchy


I'm currently building a wizard system for an application, and we're using ReactiveUI and as a result Rx.

Each step in the wizard implements IWizardStep<T> where T is just the data type that the wizard ultimately produces.

Each step has the capability of surfacing which step should be available for the user to move to next, so as to enable branching based on data entry.

The step can be thought of having a similar structure to:

public interface IWizardStep<T>
{
    IObservable<IStepChoice<T>> NextStepChoice {get;}
}

With IStepChoice<T> simply being:

public interface IStepChoice<T>
{
    IWizardStep<T> Step {get;}
    string Reason {get;}
}

In order to calculate the current path from the start to the end, for display to the user, I need to be able to go from the starting step, and walk along the NextStepChoice chain recursively until it hits a null (it's valid behaviour for the NextStepChoice observable to emit a null to indicate the end of the wizard).

I've taken a look at Observable.Scan but can't for the life of me work out how to get this working properly recursively.

I've also taken a look at Observable.Generate which looks promising, as this is a classic unfold-esque problem; the only issue is that Generate takes a function to determine when to break the loop, but I need to evaluate the inner observable to work this out.

Observable.Generate(
    new WizardStepChoice<T>(start, null),
    choice => choice != null,
    choice => choice.ChosenStep.NextStepChoice,
    choice => choice);

This would be ideal, and produce the output I'm after, but the NextStepChoice selector there obviously doesn't compile because it's an IObservable<IWizardStepChoice<T>> rather than an IWizardStepChoice<T>.

I've looked at using Aggregate and Scan but as these are more fold-driven operations, and I've only got the starting element, it's an unfold I'm looking for ala Generate, but I need it to be able to evaluate the nested observable.

Would Observable.Create perhaps be something I could utilise? I've tried it and come up with:

Path = CurrentStep.Select(_ => Observable.Create<IWizardStep<T>>(async observer =>
 {
     IWizardStepChoice<T> next = new WizardStepChoice<T>(start, null);
     observer.OnNext(next.ChosenStep);
     while (next != null)
     {
         next = await next.ChosenStep.NextStepChoice;
         observer.OnNext(next.ChosenStep);
     }
     observer.OnCompleted();
     return Disposable.Empty;
 }).Aggregate(new List<IWizardStep<T>>(),
 (l, s) =>
 {
     l.Add(s);
     return l;
 })).Switch().Publish().RefCount();

Which has all the right signature I want IWizardStep<T>->IReadOnlyList<IWizardStep<T>>, so at first glance it looks right, but it doesn't work; it fires, and I can step through, but it hangs once it hits the await and doesn't come back.

I've got a feeling I'm close, and this is a scheduling issue, so my question really is this:

  1. What is the best approach to solve this, am I close?
  2. If this is right, why is there an issue with the await, and how might I solve it?

Update

After a little bit of tinkering I noticed that the await was likely hanging as that observable hadn't yet (and wasn't going to) emit a value (duh), which I've now resolved by initialising each step with a value at the beginning of the wizard.

I've even sanity-checked this by adding a property to IWizardStep<T> - IWizardStepChoice<T> LatestStepChoice {get;} which is just hooked up with:

NextStepChoice.Subscribe(c => _latestStepChoice = c);

This is done on the step class itself, and I can confirm it's working just fine.

Yet still the await hangs, so I tried:

  1. Making it Replay(1) so the await calling .Subscribe() would get the value - this didn't work
  2. Making it Repeat() so even if something is subscribed it'll see the new value - this just made the whole thing hang.

Clearly I'm missing something here, I want it so that when the await queries the observable, it will be given the latest value seen, which is what I thought Replay(1) would achieve; I've also tried PublishLast() but then future updates don't get honoured because of the AsyncSubject<T> behaviour.

For now I've switched to using the self-subscribed property, but it's not ideal, I'd rather not have to break out of querying the observables if I can help it, it feels "hacky".


Solution

  • A recursive walk can transform the tree of observables into a single observable:

        static IObservable<IWizardStep<T>> Walk<T>(IWizardStep<T> step)
        {
            if (step?.NextStepChoice == null)
                return Observable.Return(step);
    
            return step.NextStepChoice.SelectMany(choice => Walk(choice.Step)).StartWith(step);
        }
    

    Usage:

    var steps = await Walk(step).ToArray();