Search code examples
c#system.reactiverx.net

How to propagate an observable sequence, but only if it starts with a specific collection of values?


This is a "just for fun" question, that extends and generalizes a recent Rx question by jackdry. The problem here is how to implement an Rx operator that takes an IObservable<T> and:

  1. Forwards each element unchanged if the first elements of the sequence are equal and in the same order with the elements in a given collection (ICollection<T>).
  2. Emits an empty sequence otherwise.

For example given the collection of values [a, b, c]:

Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|

The signature of the requested operator:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default);

Solution

  • Here you go:

    public static IObservable<T> IfFirstElements<T>(
        this IObservable<T> source,
        ICollection<T> expectedFirstElements,
        IEqualityComparer<T> comparer = default) =>
    source
        .Publish(published =>
            from xs in published.Take(expectedFirstElements.Count).ToArray()
            from y in
                xs.SequenceEqual(expectedFirstElements, comparer)
                ? xs.ToObservable(Scheduler.Immediate).Concat(published)
                : Observable.Empty<T>()
            select y);
    

    I was trying to make it more efficient by failing early, but every attempt made it less efficient.

    Here's my test code:

    new[] { 1, 2, 3, 4 }
        .ToObservable()
        .IfFirstElements(new[] { 1, 2, 3 })
        .Dump();
    
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Take(7)
        .Select(x => x + 1)
        .IfFirstElements(new long[] { 1, 2, 3 })
        .Dump();
    
    new[] { 2, 2, 3, 4 }
        .ToObservable()
        .IfFirstElements(new[] { 1, 2, 3 })
        .Dump();
    

    LINQPad is requried to run the above code to get this output:

    output


    It's a little harder to get it to end early, but here it is:

    public static IObservable<T> IfFirstElements<T>(
        this IObservable<T> source,
        ICollection<T> expectedFirstElements,
        IEqualityComparer<T> comparer = default) =>
            source
                .Publish(published =>
                    from xs in
                        published
                            .Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
                            .TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
                            .Take(expectedFirstElements.Count)
                            .LastAsync()
                    from y in
                        xs.SequenceEqual(expectedFirstElements, comparer)
                        ? xs.ToObservable(Scheduler.Immediate).Concat(published)
                        : Observable.Empty<T>()
                    select y);