Search code examples
c#system.reactive

Non-intersecting LeftDurationSelector in Reactive Extensions Join


I want to create an observable that emits when the right observable emits a value and takes the last left observable value with it, i.e., as described in the following marble diagram (taken from combining-sequences)

enter image description here

reading through the documentation in the above link suggests that this can be achieved by ensuring a non-intersecting left duration selector and a right duration selector that returns immediately. So I tried this:

leftObservable.Join(rightObservable, _ => leftObservable.Publish().RefCount(), _ => Observable.Empty<Unit>(), (l,r) => (l,r))

but that doesn't seem to emit anything. What am I doing wrong?

Update

I think this may be down to the observables that are being fed into the Join statement, i.e., they originate from a switch, e.g.,

var left1 = Observable.Interval(TimeSpan.FromSeconds(5));
var left2 = Observable.Interval(TimeSpan.FromSeconds(7));

var left = left1.Select(x => left2).Switch();
var right = Observable.Interval(TimeSpan.FromSeconds(1));

var test = left.Join(right, _ => left.Publish().RefCount(), _ => Observable.Empty<Unit>(), (l, r) => (l, r))
    .Distinct(x => x.r)
    .Subscribe(x =>
    {
        Debug.WriteLine($"left: {x.l}, right: {x.r}");
    });

If I replace left with left in the join statement, things work as expected. So I must be misunderstanding how switch works in this context?


Solution

  •     static async Task Main()
        {
    
            var left = Observable.Interval(TimeSpan.FromSeconds(0.3));
            var right = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1)).Select(i => (char)('A'+i)).Take(5);
    
            right.WithLatestFrom(left).Subscribe(i => Console.WriteLine(i));
    
            Console.ReadLine();
        }
    

    gives the output:

    (A, 2)
    (B, 5)
    (C, 8)
    (D, 12)
    (E, 15)
    

    Looks like what you're expecting?