Search code examples
c#reactiveunirx

UniRX using Merge operator for Shared observable


I have issue when using Merge for shared observables. In my project I have several streams that loads different data which must be added by certain order. So I've made simple example to find a solution. And the issue is that the 2nd merged observable won't get emitted values for obvious reasons. If I remove Share operator everything would be fine but in this case root observable executes 2 times. So another option is to add Replay operator after Share. But than I have to use Connect somewhere. Unfortunately in my project the observable is just a small part of huge loading chain. And that's where I stucked.

Next code shows what problem is. observableFlatMap variable doesn't emit anything because every value that sharedObservable emits goes through observableNotEven and observableFlatMap connects only after every integer goes away.


using System.Linq;
using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

 public class Share : MonoBehaviour
    {
        void Start()
        {
            PrintNumbers();
        }

        private void PrintNumbers()
        {
            System.IObservable<int> sharedObservable = GetObservableInts();

            var observableEven = sharedObservable.
                Where(x => x % 2 == 0)
                .Debug("Even");

            var observableNotEven = sharedObservable.
                Where(x => x % 2 == 1)
                .Debug("NotEven");

            var observableFlatMap = observableEven
                .Select(x => x * 10);

            _ = Observable.Merge(observableNotEven, observableFlatMap)
                .Subscribe(_number => Debug.Log(_number))
                .AddTo(this);
        }

        private static System.IObservable<int> GetObservableInts()
        {
            var count = 10;
            var arrayInt = new int[count];
            for (int id = 0; id != count; ++id)
                arrayInt[id] = id;

            var sharedObservable = arrayInt.ToObservable()
                .Debug("Array")
                .Share();

            return sharedObservable;
        }

Solution

  • I found 2 similar solution: Use this operators .DelayFrame/Delay or .DelayFramsSubscription/DelaySubscription before .Share Feature of .Share behaviour is that it starts to emit values after first subscription and it continues to emit until last subscriber unsubscribes. And in my case after first subscriber (observableNotEven) .Share emits each value of integer array before next observable (observableFlatMap) connected (merged) to general observable sequence

     var sharedObservable = arrayInt.ToObservable()
        .Debug("Array")
        .DelayFrameSubscription(1)
        .Share();
    

    Update

    The answer to my question is to use Publish even if you get shared observable.

    private void PrintNumbers()
    {
        IConnectableObservable<int> sharedObservable = GetObservableInts().Publish();
    
        var observableEven = sharedObservable
            .Where(x => x % 2 == 0)
            .Debug("Even");
    
        var observableNotEven = sharedObservable
            .Where(x => x % 2 == 1)
            .Debug("NotEven");
    
        var observableFlatMap = observableEven
            .Select(x => x * 10);
    
        _ = Observable.Merge(observableNotEven, observableFlatMap)
            .Subscribe(_number => Debug.Log(_number))
            .AddTo(this);
    
        //After observable chain building is complete. You should use *Connect*
        sharedObservable.Connect().AddTo(this);
    }