I want to create an observable that emits when the right observable emits a value and takes the last left observable value with it, i.e., as described in the following marble diagram (taken from combining-sequences)
reading through the documentation in the above link suggests that this can be achieved by ensuring a non-intersecting left duration selector and a right duration selector that returns immediately. So I tried this:
leftObservable.Join(rightObservable, _ => leftObservable.Publish().RefCount(), _ => Observable.Empty<Unit>(), (l,r) => (l,r))
but that doesn't seem to emit anything. What am I doing wrong?
Update
I think this may be down to the observables that are being fed into the Join statement, i.e., they originate from a switch
, e.g.,
var left1 = Observable.Interval(TimeSpan.FromSeconds(5));
var left2 = Observable.Interval(TimeSpan.FromSeconds(7));
var left = left1.Select(x => left2).Switch();
var right = Observable.Interval(TimeSpan.FromSeconds(1));
var test = left.Join(right, _ => left.Publish().RefCount(), _ => Observable.Empty<Unit>(), (l, r) => (l, r))
.Distinct(x => x.r)
.Subscribe(x =>
{
Debug.WriteLine($"left: {x.l}, right: {x.r}");
});
If I replace left
with left
in the join
statement, things work as expected. So I must be misunderstanding how switch
works in this context?
static async Task Main()
{
var left = Observable.Interval(TimeSpan.FromSeconds(0.3));
var right = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1)).Select(i => (char)('A'+i)).Take(5);
right.WithLatestFrom(left).Subscribe(i => Console.WriteLine(i));
Console.ReadLine();
}
gives the output:
(A, 2)
(B, 5)
(C, 8)
(D, 12)
(E, 15)
Looks like what you're expecting?