Search code examples
c#system.reactiverx.net

Recombining elements from the same reactive stream


What I want to achieve can be described as follows:

  • I have a stream of samples, which are timestamped measurement values. This is the raw stream.
  • I am applying a filter on the raw stream, whereby I get a derived stream (it will be the hysteresis filter from this question, but for the sake of simplicity I am using a Where operator here)
  • To eliminate the big gaps resulting from slowly changing values, I am applying a Sample operator to the raw stream
  • I am merging both streams into a resulting stream

The concept looks like this:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();

var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));

new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);

await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

The original source is hot. Without Distinct I am obviously getting duplicate values, with it it looks to yield what I am expecting to see.

Is there any better approach, give the fact, that the first derived stream is not periodical?


Solution

  • You could append the index in the source observable, and then apply the DistinctUntilChanged in the final merged observable.

    var withIndex = s.Select((x, i) => (Item : x, Index : i));
    var s1 = withIndex.Where(p => p.Item % 5 == 0);
    var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));
    
    new[] { s1, s2 }
        .Merge()
        .DistinctUntilChanged(p => p.Index) // discard duplicates
        .Select(p => p.Item) // discard the index
        .Subscribe(Console.WriteLine, cts.Token);
    

    I guess that the operator DistinctUntilChanged is more lightweight than the Distinct, because it caches only the latest element.