Search code examples
springrx-javasubject-observer

run PublishSubject on different thread rxJava


I am running RxJava and creating a subject to use onNext() method to produce data. I am using Spring.

This is my setup:

@Component
public class SubjectObserver {
    private SerializedSubject<SomeObj, SomeObj> safeSource;
    public SubjectObserver() {
       safeSource = PublishSubject.<SomeObj>create().toSerialized();
       **safeSource.subscribeOn(<my taskthreadExecutor>);**
       **safeSource.observeOn(<my taskthreadExecutor>);** 
       safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
          @Override
          public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
            LOGGER.debug("{} invoked.", Thread.currentThread().getName());
            doSomething();
          }
      }
    }
    public void publish(SomeObj myObj) {
        safeSource.onNext(myObj);
    }
}

The way new data is generated on the RxJava stream is by @Autowire private SubjectObserver subjectObserver and then calling subjectObserver.publish(newDataObjGenerated)

No matter what I specify for subscribeOn() & observeOn():

  • Schedulers.io()
  • Schedulers.computation()
  • my threads
  • Schedulers.newThread

The onNext() and the actual work inside it is done on the same thread that actually calls the onNext() on the subject to generate/produce data.

Is this correct? If so, what am I missing? I was expecting the doSomething() to be done on a different thread.

Update

In my calling class, if I change the way I am invoking the publish method, then of course a new thread is allocated for the subscriber to run on.

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));

Thanks,


Solution

  • Each operator on Observable/Subject return a new instance with the extra behavior, however, your code just applies the subscribeOn and observeOn then throws away whatever they produced and subscribes to the raw Subject. You should chain the method calls and then subscribe:

    safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();
    
    safeSource
    .subscribeOn(<my taskthreadExecutor>)
    .observeOn(<my taskthreadExecutor>)
    .subscribe(new Subscriber<AsyncRemoteRequest>() {
         @Override
         public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
             LOGGER.debug("{} invoked.", Thread.currentThread().getName());
             doSomething();
         }
    });
    

    Note that subscribeOn has no practical effect on a PublishSubject because there is no subscription side-effect happening in its subscribe() method.