Search code examples
javarx-javareactive-programmingrx-java2stream-processing

RxJava 2: Why can't PublishProcessor subscribe to an Observable?


I want to implement rather simple DAG in RxJava.

We have a source of items:

Observable<String> itemsObservable = Observable.fromIterable(items)

Next, I'd like to have a processor that will subscribe to itemsObservable and will enable to multiple subscribers to subscribe to it.

So I created:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

Unfortunately, this isn't possible:
itemsObservable.subscribe(itemsProccessor);

Why? What's the proper API to implement this kind of DAG?

Here's a diagram for demonstration:

enter image description here

Here's my (failed) try to implement this kind of DAG:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor); 

Solution

  • It's because PublishProcessor implements Subscriber while Observable's subscribe method accepts Observer. You can convert your itemsObservable to Flowable and it will do the job.

        Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
        PublishProcessor<String> processor = PublishProcessor.create();
        items.toFlowable(BackpressureStrategy.BUFFER)
                .subscribe(processor);