Search code examples
rx-javarx-androidsubject-observer

RxJava - single Observable executing right away - why?


I have a very simple RXJava emitter. It is a publishSubject actually but its job to convert Integers to strings and update a UI element afterwards when it subscribes. The code looks like this :

PublishSubject integerToStringEmitter = PublishSubject.create();

    Subscription mysingle= Single.just(4).map(new Func1<Integer, String>() {


        @Override
        public String call(Integer integer) {
            return String.valueOf(integer);
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
            tv.setText(s);
        }
    });
// integerToStringEmitter.subscribe(); //it still emits even without this, why ?

What i want to happen: for subscription to only begin when i call integerToStringEmitter.subscribe();

What is happening currently : As soon as i launch the program the onNext is getting called and the UI element is being set to the # 4. why ? i need more control over this thing so it does not execute right away without me even subscribing to it. Please help solve.


Solution

  • I'm not sure what you're trying to achieve. Your code doesn't look right and shouldn't even compile. If it would compile then the subscribe(new Observer() {... part would obviously trigger the event stream.

    Here's a simple example of an Observable emitting integers and mapping them to Strings and an Observer subscribing to it:

    Observable<String> myObservable = Observable.just(1, 2, 3, 4).map(new Func1<Integer, String>() {
        @Override
        public String call(Integer integer) {
            return String.valueOf(integer);
        }
    });
    
    Subscriber<String> mySubscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            System.out.println("onError: " + e.getMessage());
        }
    
        @Override
        public void onNext(String s) {
            System.out.println("onNext: " +s);
        }
    };
    
    myObservable.subscribe(mySubscriber);