Search code examples
java-9subscriptionsubscriberpublisher

Java 9 - how publisher and subscriber works


I am trying to understand how Subscriber and Publisher works in java 9.

Here I have created one subscriber here and using SubmissionPublisher for publishing item .

I am trying to publish 100 strings to subscriber. If I do not make the Client program to sleep (see commented code in MyReactiveApp), I do not see all the items are published.

why is it not waiting for all the strings processed here:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 

If I replace the above code with, I see all the strings are printed in console

strs.stream().forEach(System.out::println);

Client program that publishes using SubmissionPublisher.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) {
            Thread.sleep(10);
        }*/

        //publisher.close();

        System.out.println("Exiting the app");

    }

    private static List<String> getStrs(){

        return Stream.generate(new Supplier<String>() {
            int i =1;
            @Override
            public String get() {
                return "name "+ (i++);
            }
        }).limit(100).collect(Collectors.toList());
    }

}

Subscriber

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100);

    }

    @Override
    public void onNext(String item) {
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    }

    @Override
    public void onComplete() {
        System.out.println("activity completed");

    }
    public int getCounter() {
        return counter;
    }

}

output:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12

Solution

  • SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    

    Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers

    see: https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

    So actually

        strs.stream().forEach(i -> publisher.submit(i));
    

    enqueues all submissions and delivers them asynchronously on another thread. But then the application is terminated. This is independent of the progress of the worker thread. This means that the application is terminated regardless of how many elements the worker thread has already delivered.

    This can be different for each run. In the worst case, the application could be terminated before the first item is delivered.

    Threads

    If you want to verify that the main method of MyReactiveApp and the delivery in MySubscriber's onNext happen on different threads you can print out the names of the corresponding threads, e.g. in MyReactiveApp's main:

    System.out.println(Thread.currentThread().getName()) 
    

    will output main as thread name.

    Whereas MySubscriber's onNext method will e.g. output something like ForkJoinPool.commonPool-worker-1.

    User and Deamon Threads

    Why does the application terminate although we still have a running thread?

    There are two kind of threads in Java:

    • user threads
    • daemon threads

    A Java program terminates when no longer any user threads are running, even when deamon threads are still running.

    The main thread is a user thread. The SubmissionPublisher uses here worker threads from ForkJoinPool.commonPool(). These are daemon threads.

    All worker threads are initialized with Thread.isDaemon() set true.

    https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html