Search code examples
swiftreactive-programmingcombinepublisher

Creating a publisher that collects values after a trigger


Is there a way to turn on the collect() after it sees a certain value and then turn it off after it sees another?

I tried using map and filter upstream and that's totally wrong.

Here's my playground

import UIKit
import Foundation
import Combine

var subj = PassthroughSubject<Int, Never>()

for iter in 0...10 {
    DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(iter)) {
        print("send \(iter)")
        subj.send(iter)
    }
}

let queue = DispatchQueue(label: "Q")

let cancellable = subj.collect(.byTime(queue, .seconds(2))).sink(receiveCompletion: {
    print("finish")
    print($0)
}, receiveValue: {
    print($0)
})

/* my print
send 0
send 1
send 2
[0, 1]
send 3
send 4
[2, 3]
send 5 
send 6
[4, 5] // is there a way to only have it dispatch all the values after this point? After it sees a value of 5?
send 7
send 8
[6, 7]
send 9
send 10
[8, 9, 10]

*/


Solution

  • One solution to this would be to employ the drop(untilOutputFrom:) and prefix(untilOutputFrom:) operators.

    Consider the playground below.

    In the code I create a signal startEmitting that doesn't emit anything until it sees the value 5. Then it emits a "true" value and stops.

    Similarly the signal stopEmitting waits until it sees a 9 and when it does it emits a single value and stops.

    The combined signal drops every value until startEmitting fires, and using prefix only emits values until stopEmitting fires. The net effect is that the combined signal only starts when a 5 is emitted and stops when 9 is emitted.

    import UIKit
    import Foundation
    import Combine
    
    var subj = PassthroughSubject<Int, Never>()
    
    for iter in 0...10 {
        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(iter)) {
            print("send \(iter)")
            subj.send(iter)
            
            if iter == 10 {
                subj.send(completion: .finished)
            }
        }
    }
    
    let startEmitting = subj
        .print("startEmtting")
        .contains(5)
    
    let stopEmitting = subj
        .print("stopEmitting")
        .contains(9)
    
    let cancellable = subj
        .drop(untilOutputFrom: startEmitting)
        .prefix(untilOutputFrom: stopEmitting)
        .collect()
        .sink {
        print("finish")
        print($0)
    } receiveValue: {
        print($0)
    }