Search code examples
c#.netsystem.reactivereactive-streamsrx.net

How to transform observable based on predicate involving first element


I'm trying to create an Rx.NET operator that takes an Observable<string> and:

  • Forwards each element unchanged if the first element is "a"
  • Emits just a completion signal otherwise

For example:

-a-b-c-d-|- --> -a-b-c-d-|-

-b-c-d-|- --> -|-

How can I do this?


Solution

  • Here's a version that definately doesn't have a race condition:

    public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
        source
            .Publish(published =>
                from x in published.Take(1)
                from y in
                    x.Equals(expectedFirstElement)
                    ? published.StartWith(x)
                    : Observable.Empty<T>()
                select y);
    

    There's the method syntax version:

    public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
        source
            .Publish(published =>
                published
                    .Take(1)
                    .SelectMany(x =>
                        x.Equals(expectedFirstElement)
                        ? published.StartWith(x)
                        : Observable.Empty<T>()));
    

    I prefer the query syntax, but hey...