What I want to achieve can be described as follows:
Where
operator here)Sample
operator to the raw streamThe concept looks like this:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();
var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));
new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
The original source is hot. Without Distinct
I am obviously getting duplicate values, with it it looks to yield what I am expecting to see.
Is there any better approach, give the fact, that the first derived stream is not periodical?
You could append the index in the source observable, and then apply the DistinctUntilChanged
in the final merged observable.
var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));
new[] { s1, s2 }
.Merge()
.DistinctUntilChanged(p => p.Index) // discard duplicates
.Select(p => p.Item) // discard the index
.Subscribe(Console.WriteLine, cts.Token);
I guess that the operator DistinctUntilChanged
is more lightweight than the Distinct
, because it caches only the latest element.