Search code examples
iosswiftsystem.reactiverx-swiftrx-cocoa

How to throttle observable until a condition becomes true


I have a collection view in my app, and it would refresh with animation when there is new content or deletion. However, I don't want it to refresh while user is scrolling because it would cause jerking. I want to refresh the collection view only when user has finished scrolling / when it's not scrolling.

So I have a data source driver, and I tried to use filter to make it wait till it becomes true but no luck.

This is my scrolling driver that I pass to the ViewModel

let isScrollViewScrollingDriver = Observable.merge(
            gridCollectionView.rx.willBeginDragging.map { _ in true },
            gridCollectionView.rx.didEndDragging.map { _ in false }
        ).asDriver(onErrorJustReturn: false).startWith(false).distinctUntilChanged()

my ViewModel init in view controller

viewModel = ViewModel(
            photoLibraryService: PhotoLibraryService.shared,
            isGridViewScrolling: isScrollViewScrollingDriver,
            disposeBag: disposeBag
        )

My ViewModel

let assetDriver = photoLibraryService.albumDriver.asObservable()
                .withLatestFrom(
                    isGridViewScrolling.asObservable().filter { $0 == false }
                ) { (arg0, arg1) in
                    return arg0
                }.flatMapLatest { libraryAlbum -> Observable<[LibraryAsset]> in
                    return photoLibraryService.convert(album: libraryAlbum)
                }.asDriver(onErrorJustReturn: []).startWith([]).distinctUntilChanged()

And then I map assetDriver to a dataSourceDriver that drive my collection view.

What changes can I make to the assetDriver to make it wait for isGridViewScrolling to become false? Thanks.


Solution

  • It sounds like you need my stallUnless(_:initial:) operator. https://gist.github.com/danielt1263/2b624d7c925d8b7910ef2f1e5afe177b

    
        /**
         Emits values immediately if the boundary sequence last emitted true, otherwise collects elements from the source sequence until the boundary sequence emits true then emits the collected elements.
         - parameter boundary: Triggering event sequence.
         - parameter initial: The initial value of the boundary
         - returns: An Observable sequence.
         */
        func stallUnless<O>(_ boundary: O, initial: Bool) -> Observable<Element> where O: ObservableType, O.Element == Bool {
            return Observable.merge(self.map(Action.value), boundary.startWith(initial).distinctUntilChanged().materialize().map(Action.trigger).takeUntil(self.takeLast(1)))
                .scan((buffer: [Element](), trigger: initial, out: [Element]()), accumulator: { current, new in
                    switch new {
                    case .value(let value):
                        return current.trigger ? (buffer: [], trigger: current.trigger, out: [value]) : (buffer: current.buffer + [value], trigger: current.trigger, out: [])
                    case .trigger(.next(let trigger)):
                        return trigger ? (buffer: [], trigger: trigger, out: current.buffer) : (buffer: current.buffer, trigger: trigger, out: [])
                    case .trigger(.completed):
                        return (buffer: [], trigger: true, out: current.buffer)
                    case .trigger(.error(let error)):
                        throw error
                    }
                })
                .flatMap { $0.out.isEmpty ? Observable.empty() : Observable.from($0.out) }
        }
    }