This is a "just for fun" question, that extends and generalizes a recent Rx question by jackdry. The problem here is how to implement an Rx operator that takes an IObservable<T>
and:
ICollection<T>
).For example given the collection of values [a, b, c]:
Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|
The signature of the requested operator:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default);
Here you go:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in published.Take(expectedFirstElements.Count).ToArray()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
I was trying to make it more efficient by failing early, but every attempt made it less efficient.
Here's my test code:
new[] { 1, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(7)
.Select(x => x + 1)
.IfFirstElements(new long[] { 1, 2, 3 })
.Dump();
new[] { 2, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
LINQPad is requried to run the above code to get this output:
It's a little harder to get it to end early, but here it is:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in
published
.Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
.TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
.Take(expectedFirstElements.Count)
.LastAsync()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);