Search code examples
androidrx-javarx-android

How can I continue Rx chain different ways based on the property?


I have a method where based on Flowable data value I need to continue RX chain different ways . what I mean is that if the isOnline property of data object is true then I need to call scan(initial,selector) but if it is false then I need to call scan(selector)


 @NotNull
    public Flowable<Data> initialCall(
            @NotNull Flowable<Info> info, Data initial) {
        return  info
                .map()
                .switchMap(it -> call(Flowable.just(it),initial, it.isOnline));
    }



   private Flowable<Data> call (
            Flowable<A> a, 
            Data initial, boolean isOnline
    ) {
        return Flowable.combineLatest(
                a,
                b,
                (a, b) -> {
                    return ....;
                })
                .switchMap()
    ///here based on the Data isOnline property I need to call either 
    ///scan(initial, selector) or scan(selector) and then continue.... 
                .map()
                .distinctUntilChanged()
                .toObservable()
                .compose()
                .compose()
                .toFlowable(BUFFER)
    }

Solution

  • Is this, what you want? Based on isOnline a scan-operator is applied with or without a seed value.

    import io.reactivex.rxjava3.core.BackpressureStrategy;
    import io.reactivex.rxjava3.core.Flowable;
    import io.reactivex.rxjava3.core.FlowableTransformer;
    import org.junit.jupiter.api.Test;
    
    class So65349760 {
      private static <C extends Data> FlowableTransformer<Data, Data> scan(
          Boolean isOnline, C initialValue) {
        if (isOnline) {
          return upstream -> {
            return upstream.scan(
                initialValue,
                (prev, current) -> {
                  return new Data();
                });
          };
        } else {
          return upstream -> upstream.scan((prev, current) -> new Data());
        }
      }
    
      @Test
      void so65349760() {
        Flowable<Integer> a = Flowable.fromCallable(() -> 1);
        Flowable<String> b = Flowable.fromCallable(() -> "42");
    
        Data seed = new Data();
    
        call(a, b, seed, false).test().assertValueCount(1);
        call(a, b, seed, true).test().assertValueCount(2);
      }
    
      private <A, B, C extends Data> Flowable<Data> call(
          Flowable<A> a, Flowable<B> b, C init, boolean isOnline) {
        return Flowable.combineLatest(a, b, (v1, v2) -> 42)
            .switchMap(integer -> Flowable.just(new Data()))
            .compose(scan(isOnline, init))
            .map(d -> d)
            .distinctUntilChanged()
            .toObservable()
            .toFlowable(BackpressureStrategy.BUFFER);
      }
    
      private static class Data {}
    }