Search code examples
rx-javacorecursion

Observable.Generate in RxJava?


.NET Reactive Extensions has a neat method to generate sequences using corecursion which is called Observable.Generate.

Is there analogues method in RxJava that allows data generation via corecursion? If not, could it be implemented based on existing methods?


Solution

  • It's not an exact match but we have SyncOnSubscribe (and AsyncOnSubscriber) that can generate values, for example:

    @Test
    public void testRange() {
        final int start = 1;
        final int count = 4000;
        OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(new Func0<Integer>(){
            @Override
            public Integer call() {
                return start;
            }}, 
            new Func2<Integer, Observer<? super Integer>, Integer>() {
                @Override
                public Integer call(Integer state, Observer<? super Integer> subscriber) {
                    subscriber.onNext(state);
                    if (state == count) {
                        subscriber.onCompleted();
                    }
                    return state + 1;
                }
            });
    
        @SuppressWarnings("unchecked")
        Observer<Object> o = mock(Observer.class);
        InOrder inOrder = inOrder(o);
    
        Observable.create(os).subscribe(o);
    
        verify(o, never()).onError(any(TestException.class));
        inOrder.verify(o, times(count)).onNext(any(Integer.class));
        inOrder.verify(o).onCompleted();
        inOrder.verifyNoMoreInteractions();
    }