Search code examples
c#unity-game-enginereactivexrx.netunirx

How to combine IObservable sequences in UniRx/Rx.NET?


I'm using the UniRx flavor of Reactive Extensions for the Unity3D game engine. Unity uses C#, so I guess it's similar to Rx.NET.

I need a more beautiful way of checking when several observable sequences complete.

In the example below, one of the sequences is dependent on the outcome of the first (since it needs an integer for processID).

The observables are both of type IObservable<string>.

var processListObservable = APIBuilder
    .GetProcessList(authInfo.Token, authInfo.PlatformURL, (int)product.Id)
    .Subscribe(listJson =>
    {
        processList = ProcessList.FromJson(listJson);
        int processID = (int)processList.Processes[0].ProcessDataId;

        //Retrieve Detailed information of the first entry
        var processDetailsObservable = APIBuilder
            .GetProcessDetails(token, platformURL, product.Id, processID)
            .Subscribe(detailsJson =>
            {
                processData = ProcessData.FromJson(detailsJson);
                SetupPlotView();
            });
    });

Any hint would be highly appreciated. Also some suggestions to solve the same scenario minus the dependency on the result of the first sequence.


Solution

  • Instead of putting your code into the Subscribe handler, you could make it part of the sequence. You could use the Select operator in order to project each listJson to an IObservable<string> (resulting to a nested IObservable<IObservable<string>>), and then flatten the sequence by using either the Concat or the Merge operator, depending on whether you want to prevent or allow concurrency.

    var processListObservable = APIBuilder
        .GetProcessList(authInfo.Token, authInfo.PlatformURL, (int)product.Id)
        .Select(listJson =>
        {
            var processList = ProcessList.FromJson(listJson);
            int processID = (int)processList.Processes[0].ProcessDataId;
            return APIBuilder.GetProcessDetails(token, platformURL, product.Id, processID);
        })
        .Concat() // or .Merge() to allow concurrency
        .ObserveOn(SynchronizationContext.Current) // Optional
        .Do(detailsJson =>
        {
            var processData = ProcessData.FromJson(detailsJson);
            SetupPlotView(processData);
        });
    
    await processListObservable.DefaultIfEmpty(); // Start and await the operation
    

    The await in the final line will cause an implicit subscription to the processListObservable, and your code will execute as a side-effect of this subscription.