Search code examples
swiftreactive-programmingrx-swift

RxSwift event fired twice


I set up this sandbox with a piece of code I got combining internal and external actions. I simplified as much as possible to reproduce the issue.

import PlaygroundSupport
import RxSwift

class Sandbox {
    let publisher: PublishSubject<Int>

    private let disposeBag = DisposeBag()

    init() {
        self.publisher = PublishSubject()

        let publisher2 = publisher.debug()
        let publisher3 = PublishSubject<Int>()
        let publisherMerge = PublishSubject.merge([publisher2, publisher3])

        let operation = publisherMerge
            .map { $0 + 2 }

        let operation2 = operation
            .map { $0 + 3 }

        // Will never fire on the sandbox
        operation
            .filter { $0 < 0 }
            .flatMapLatest(Sandbox.doSomething)
            .subscribe { print("Operation ", $0) }
            .disposed(by: disposeBag)

        operation2
            .subscribe { print("Operation2 ", $0) }
            .disposed(by: disposeBag)
    }

    static func doSomething(value: Int) -> Observable<Int> {
        return .just(value)
    }
}

let disposeBag = DisposeBag()
let sandbox = Sandbox()

Observable<Int>
    .interval(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe(onNext: { _ in sandbox.publisher.onNext(0) })
    .disposed(by: disposeBag)

PlaygroundPage.current.needsIndefiniteExecution = true
 

I got confused as the events are triggered twice for some reason.

2022-11-07 21:16:15.292: Sandbox.playground:12 (init()) -> subscribed
2022-11-07 21:16:15.293: Sandbox.playground:12 (init()) -> subscribed
2022-11-07 21:16:18.301: Sandbox.playground:12 (init()) -> Event next(0)
2022-11-07 21:16:18.302: Sandbox.playground:12 (init()) -> Event next(0)
Operation2  next(5)

Any approach on why is this happening and how to ensure only 1 event if fired.

EDIT 2022-11-08T02:49:37+00:00

A better example to illustrate what I am trying to achieve:

import Foundation
import PlaygroundSupport
import RxSwift

struct ViewModel: Equatable {
    enum State: Equatable {
        case initialized
    }
    
    let state: State
}

enum Reducer {
    enum Action {
        case dummyReducerAction(String)
    }

    enum Effect {
        case dummyEffect
    }

    struct State: Equatable {
        let viewModel: ViewModel
        let effect: Effect?
    }

    static func reduce(state: State, action: Action) -> State {
        let viewModel: ViewModel = state.viewModel
        let _: Effect? = state.effect
        let viewModelState: ViewModel.State = viewModel.state
        let noChange = State(viewModel: viewModel, effect: nil)

        switch (action, viewModelState) {
        case (.dummyReducerAction(let s), _):
            print(s, Date())
            return noChange
        }
    }

    static func fromAction(_ action: Interactor.Action) -> Action {
        switch action {
        case .dummyAction:
            return .dummyReducerAction("fromAction")
        }
    }

    static func performAsyncSideEffect(effect: Effect) -> Observable<Action> {
        switch effect {
        case .dummyEffect:
            return .just(.dummyReducerAction("performSideEffect"))
        }
    }
}

class Interactor {
    enum Action {
        case dummyAction
    }

    let action: PublishSubject<Action>
    let viewModel: BehaviorSubject<ViewModel>

    private let disposeBag = DisposeBag()

    init() {
        self.action = PublishSubject()
        
        let initialViewModel = ViewModel(state: .initialized)
        let initialReducerState = Reducer.State(viewModel: initialViewModel, effect: nil)
        
        let externalAction = action.map(Reducer.fromAction).debug()
        let internalAction = PublishSubject<Reducer.Action>()
        let allActions = PublishSubject.merge([externalAction, internalAction])
        
        self.viewModel = BehaviorSubject(value: initialViewModel)
        
        let reducerState = allActions
            .scan(initialReducerState, accumulator: Reducer.reduce)
        
        let viewModelObservable = reducerState
            .map { $0.viewModel }
            .distinctUntilChanged()
        
        reducerState
            .compactMap { $0.effect }
            .flatMapLatest(Reducer.performAsyncSideEffect)
            .subscribe(internalAction)
            .disposed(by: disposeBag)
        
        viewModelObservable
            .subscribe(onNext: viewModel.onNext)
            .disposed(by: disposeBag)
    }
}

let disposeBag = DisposeBag()
let interactor = Interactor()

Observable<Int>
    .interval(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(onNext: { _ in interactor.action.onNext(.dummyAction) })
    .disposed(by: disposeBag)

PlaygroundPage.current.needsIndefiniteExecution = true

Even with share(), the reducer is fired twice and print the action content.

with share():

2022-11-08 11:46:05.298: Sandbox.playground:71 (init()) -> subscribed
2022-11-08 11:46:10.310: Sandbox.playground:71 (init()) -> Event next(dummyReducerAction("fromAction"))
fromAction 2022-11-08 02:46:10 +0000
fromAction 2022-11-08 02:46:10 +0000

without share()

2022-11-08 11:56:33.369: Sandbox.playground:71 (init()) -> subscribed
2022-11-08 11:56:33.370: Sandbox.playground:71 (init()) -> subscribed
2022-11-08 11:56:38.383: Sandbox.playground:71 (init()) -> Event next(dummyReducerAction("fromAction"))
fromAction 2022-11-08 02:56:38 +0000
2022-11-08 11:56:38.385: Sandbox.playground:71 (init()) -> Event next(dummyReducerAction("fromAction"))
fromAction 2022-11-08 02:56:38 +0000

Solution

  • Every subscription gets its own stream. Since you are subscribing twice to the operation observable (once through operation2 and once directly) you get two subscribe requests and two next events every time it emits a value.

    If you don't want that, the solution is to use the .share() operator.

    let operation = publisherMerge
        .map { $0 + 2 }
        .share()
    

    I'm not sure that it's actually a problem though since there is no state depending on number of subscriptions that I can see.

    EDIT

    Again, the problem is the same, you just changed which thing is being observed twice.

    Maybe it will help if I explain how I figure out where the share() should go.

    1. Find the debug() statement. I see that it's on externalAction
    2. Is externalAction being shared? No, it's only being used by allActions.
    3. Is allActions being shared? No, it's only being used by reducerState.
    4. Is reducerState being shared? Yes, it's being used by viewModelObservable and it's explicitly subscribed to.
    5. Put the share on that observable.
    let reducerState = allActions
        .scan(initialReducerState, accumulator: Reducer.reduce)
        .share()
    

    No matter how complex the example gets, the solution is to put a share() (or share(replay: 1)) on whatever Observable(s) that is being used in more than one place.