Search code examples
reactive-programmingpublish-subscribejava-9reactivereactive-streams

How to pass a message to another Subscriber?


I am learning the Reactive Stream API of Java 9.

As it has the Publisher and Subscriber, and the Subscriber subscribes to the Publisher and the Subscriber also implements the following messages:

public class TestSubscriber<T> implements Subscriber<T> {

    @Override
    public void onComplete() {
        // TODO Auto-generated method stub

    }

    @Override
    public void onError(Throwable arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onNext(T arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onSubscribe(Subscription arg0) {
        // TODO Auto-generated method stub

    }

}

I did not found any method in Subscriber which passes/transfers the message to another subscriber. Any suggestions?


Solution

  • This can be done implementing the Flow.Processor as follows:

    import java.util.concurrent.Flow;
    import java.util.concurrent.Flow.Subscription;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.function.Function;
    
    public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
    
        private Function<T, R> function;
        private Flow.Subscription subscription;
    
        public MyTransformer(Function<T, R> function) {
            super();
            this.function = function;
        }
    
        @Override
        public void onComplete() {
            System.out.println("Transformer Completed");
        }
    
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    
        @Override
        public void onNext(T item) {
            System.out.println("Transformer Got : "+item);
            submit(function.apply(item));
            subscription.request(1);
    
        }
    
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }
    
    
    }
    

    And when calling it use as follows:

    public class TestTransformer {
    
        public static void main(String... args) {
            SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
            MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
    
            TestSubscriber<Integer> subscriber = new TestSubscriber<>();
            List<String> items = List.of("1", "2", "3");
    
            List<Integer> expectedResult = List.of(1, 2, 3);
    
            publisher.subscribe(transformProcessor);
            transformProcessor.subscribe(subscriber);
            items.forEach(publisher::submit);
            publisher.close();
    
        }
    }