Search code examples
kotlinrx-java2rx-kotlin2

RxJava2: onComplete not called with flatMapIterable


Here is short snippet of code:

    val subject = BehaviorSubject.createDefault(emptyList<Int>())
    subject.onNext(Arrays.asList(1, 2, 3))
    subject.flatMapIterable { list: List<Int> -> list }
            .subscribeBy(
                    onNext = { l("on next", it) },
                    onComplete = { l("on complete") }
            )

Why onComplete doesn't not call here? What I should do for working this code? Because in original code I can not use .toList() method.


Solution

  • The BehaviorSubject you have in its form is an infinite source unless onComplete is called on it. Therefore flatMapIterable will not complete and your onComplete handler will never get invoked.

    So either you complete the BehaviorSubject:

    val subject = BehaviorSubject.createDefault(emptyList<Int>())
    subject.onNext(Arrays.asList(1, 2, 3))
    subject.flatMapIterable { list: List<Int> -> list }
           .subscribeBy(
                    onNext = { l("on next", it) },
                    onComplete = { l("on complete") }
           )
    
    subject.onComplete() // <-----------------------------------------------------
    

    or you take at most one item of it

    val subject = BehaviorSubject.createDefault(emptyList<Int>())
    subject.onNext(Arrays.asList(1, 2, 3))
    subject.take(1) // <----------------------------------------------------------
           .flatMapIterable { list: List<Int> -> list }
           .subscribeBy(
                    onNext = { l("on next", it) },
                    onComplete = { l("on complete") }
           )