I am running RxJava and creating a subject to use onNext()
method to produce data. I am using Spring.
This is my setup:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
The way new data is generated on the RxJava stream is by @Autowire private SubjectObserver subjectObserver
and then calling subjectObserver.publish(newDataObjGenerated)
No matter what I specify for subscribeOn()
& observeOn()
:
The onNext()
and the actual work inside it is done on the same thread that actually calls the onNext()
on the subject to generate/produce data.
Is this correct? If so, what am I missing? I was expecting the doSomething()
to be done on a different thread.
Update
In my calling class, if I change the way I am invoking the publish
method, then of course a new thread is allocated for the subscriber to run on.
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
Thanks,
Each operator on Observable
/Subject
return a new instance with the extra behavior, however, your code just applies the subscribeOn
and observeOn
then throws away whatever they produced and subscribes to the raw Subject
. You should chain the method calls and then subscribe:
safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized();
safeSource
.subscribeOn(<my taskthreadExecutor>)
.observeOn(<my taskthreadExecutor>)
.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
});
Note that subscribeOn
has no practical effect on a PublishSubject
because there is no subscription side-effect happening in its subscribe()
method.