Search code examples
javareactive-programmingnetflixrx-java

Rx java Operators; Encapsulating data flow into custom Operators


Lets say I'm observing an observable in a very specific way.

    resultObservable = anotherObservable.filter(~Filter code~).take(15);  

I'd like to create a custom operator that combines two predefined operators like filter, and take. Such that it behaves like

    resultObservable = anotherObservable.lift(new FilterAndTake(15));  

or...

    resultObservable = anotherObservable.FilterAndTake(15);  

So far Im comfortable with writing a very specific operator that can do this. And I can lift that operator.

But, given my currently limited knowledge of rx java, this would involve re-writing the take and filter functionality every time I need to use it in a custom operator.

Doing this is fine, But I'd rather re-use pre-existing operators that are maintained by an open source community, as well as recycle operators I've created.

Something also tells me I lack adequate knowledge about operators and subscribers.

Can someone recommend tutorials that aren't rx-java documentation?
I say this because, while docs explain general concepts, it isolates the concepts and general contexts of their functionality leaving no examples to inspire more robust applications of RX java.

So essentailly

I'm trying to encapsulate custom-dataflows into representative operators. Does this functionality exist?


Solution

  • I'm not aware of some special function (or sugar) that composes Operator objects. But you can simply create a new Operator to compose existing Operators. Here's a working example of the FilterAndTake Operator:

    public class FilterAndTake<T> implements Observable.Operator<T, T> {
    
        private OperatorFilter<T> filter;
        private OperatorTake<T> take;
    
        public FilterAndTake(Func1<? super T, Boolean> predicate, int n) {
            this.filter = new OperatorFilter<T>(predicate);
            this.take = new OperatorTake<T>(n);
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> child) {
            return filter.call(take.call(child));
        }
    }
    

    And then you can use it as follows:

    public static void main(String[] args) {
        Observable<Integer> xs = Observable.range(1, 8);
    
        Func1<Integer, Boolean> predicate = new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer x) {
                return x % 2 == 0;
            }
        };
    
        Action1<Integer> action = new Action1<Integer>() {
            @Override
            public void call(Integer x) {
                System.out.println("> " + x);
            }
        };
    
        xs.lift(new FilterAndTake<Integer>(predicate, 2)).subscribe(action);
    }