Search code examples
rx-swift

How can I check all historic values in a ReplaySubject in RxSwift?


I have a simple struct Foo:

struct Foo {
    let bar: String
}

Now I create an unbounded ReplaySubject of Foos:

let subject = ReplaySubject<Foo>.createUnbounded()

How can I now understand whether the (unterminated) stream has a Foo whose bar was equal to abc? (This could be the 1st, 3rd, or 20th element.)


Solution

  • First, this is kind of a crazy request. You should not be thinking about "what was" when working with Rx. Rather you should be thinking about what is always the case. You should be thinking about invariants...

    That said, the operator below will emit the indexes for you. Since the subject is able to constantly emit events, the operator is designed to work in real time. It can be used like this:

    let indexes = subject.indexOfElementSatisfying { $0.bar == "abc" }
    

    Here it is:

    extension ObservableConvertibleType {
    
        /**
         Emits the index of all the values in the stream that satisfy the predicate.
    
         - parameter pred: The predicate that determines whether the value satisfies the condition
         - returns: An observable sequence of indexes to those elements.
         */
        func indexOfElementSatisfying(_ pred: @escaping (E) throws -> Bool) -> Observable<Int> {
            return asObservable()
                .enumerated()
                .filter { try pred($0.element) }
                .map { $0.index }
        }
    }