Search code examples
javamultithreadingconcurrencyjava.util.concurrentphaser

Registering thread to Phaser


I am learning about Phaser. While doing so, I came across a problem. Below is the code that I have,

public class RunnableTask implements Runnable {

    private Phaser phaser;

    public RunnableTask(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register();  // Question
    }

    @Override
    public void run() {
        // this.phaser.register();  // Question
        print("After register");
        for (int i = 0; i < 2; i++) {
            sleep();
            print("Before await" + i + ":");
            this.phaser.arriveAndAwaitAdvance();
            print("After advance" + i + ":");
        }
    }

    private void sleep() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void print(String msg) {

        System.out.println(String.format("%s: %s, time=%s, registered=%s, arrived=%s, unarrived=%s, phase=%s.", msg,
                Thread.currentThread().getName(), LocalTime.now(), this.phaser.getRegisteredParties(),
                this.phaser.getArrivedParties(), this.phaser.getUnarrivedParties(), this.phaser.getPhase()));
    }

 }

Sample test for the above

public class TestPhaser {

    public static void main(String[] args) {

        Phaser phaser = new Phaser();

        RunnableTask task = new RunnableTask(phaser);

        Thread t1 = new Thread(task, "t1");
        Thread t2 = new Thread(task, "t2");
        Thread t3 = new Thread(task, "t3");

        t1.start();
        t2.start();
        t3.start();
    } 
}

Upon executing the above program the output is :

After register: t3, time=22:01:26.636, registered=1, arrived=0, unarrived=1, phase=0.

After register: t2, time=22:01:26.636, registered=1, arrived=0, unarrived=1, phase=0.

After register: t1, time=22:01:26.636, registered=1, arrived=0, unarrived=1, phase=0.

Before await 0:: t3, time=22:01:28.728, registered=1, arrived=0, unarrived=1, phase=0.

Before await 0:: t2, time=22:01:28.728, registered=1, arrived=0, unarrived=1, phase=0.

Before await 0:: t1, time=22:01:28.728, registered=1, arrived=0, unarrived=1, phase=0.

After advance 0:: t1, time=22:01:28.728, registered=1, arrived=0, unarrived=1, phase=3.

After advance 0:: t2, time=22:01:28.728, registered=1, arrived=0, unarrived=1, phase=3.

After advance 0:: t3, time=22:01:28.729, registered=1, arrived=0, unarrived=1, phase=3.

Before await 1:: t2, time=22:01:30.730, registered=1, arrived=0, unarrived=1, phase=3.

Before await 1:: t3, time=22:01:30.730, registered=1, arrived=0, unarrived=1, phase=3.

After advance 1:: t2, time=22:01:30.730, registered=1, arrived=0, unarrived=1, phase=4.

After advance 1:: t3, time=22:01:30.732, registered=1, arrived=0, unarrived=1, phase=5.

Before await 1:: t1, time=22:01:30.730, registered=1, arrived=0, unarrived=1, phase=3.

After advance 1:: t1, time=22:01:30.732, registered=1, arrived=0, unarrived=1, phase=6.

You can see there are lots of discrepancies here. The threads are not advanced in a sequence. Also, there are few phases missing or/and not in a sequence.

When I moved the line of code this.phaser.register() from constructor to the beginning of the run method, the output is :

After register: t1, time=22:10:58.230, registered=3, arrived=0, unarrived=3, phase=0.

After register: t3, time=22:10:58.230, registered=3, arrived=0, unarrived=3, phase=0.

After register: t2, time=22:10:58.230, registered=3, arrived=0, unarrived=3, phase=0.

Before await 0:: t2, time=22:11:00.314, registered=3, arrived=0, unarrived=3, phase=0.

Before await 0:: t1, time=22:11:00.314, registered=3, arrived=0, unarrived=3, phase=0.

Before await 0:: t3, time=22:11:00.314, registered=3, arrived=0, unarrived=3, phase=0.

After advance 0:: t2, time=22:11:00.315, registered=3, arrived=0, unarrived=3, phase=1.

After advance 0:: t3, time=22:11:00.315, registered=3, arrived=0, unarrived=3, phase=1.

After advance 0:: t1, time=22:11:00.315, registered=3, arrived=0, unarrived=3, phase=1.

Before await 1:: t1, time=22:11:02.319, registered=3, arrived=0, unarrived=3, phase=1.

Before await 1:: t2, time=22:11:02.319, registered=3, arrived=0, unarrived=3, phase=1.

Before await 1:: t3, time=22:11:02.319, registered=3, arrived=0, unarrived=3, phase=1.

After advance 1:: t3, time=22:11:02.320, registered=3, arrived=0, unarrived=3, phase=2.

After advance 1:: t2, time=22:11:02.320, registered=3, arrived=0, unarrived=3, phase=2.

After advance 1:: t1, time=22:11:02.321, registered=3, arrived=0, unarrived=3, phase=2.

This looks much better that the threads execution and phases are in a sequence.

Here are my questions:

1) Why there are lots of discrepancies when parties were registered inside the constructor of Runnable?

2) In the second result, the stats for arrived and unarrived are zero (incorrect) at each phase. So, how to obtain the correct numbers for them?

Any help is appreciated.


Solution

  • In first example "creation phaser in constructor", you are registering only one thread to Phaser. You have to create three tasks, to register three threads in phaser.

    Change the code like this, and it will work. (dont forget to remove initial RunnableTask task = new RunnableTask(phaser); from your code)

        Thread t1 = new Thread(new RunnableTask(phaser), "t1");
        Thread t2 = new Thread(new RunnableTask(phaser), "t2");
        Thread t3 = new Thread(new RunnableTask(phaser), "t3");
    

    In second example, you wait exactly 2 seconds in all threads, this is to exact and all of them arrive and await almost at the same time, change your sleep method like this to introduce some different waits for the threads to see some arrived and unarrived threads

      private void sleep() {
        try {
          Random r = new Random();    
          TimeUnit.SECONDS.sleep(r.nextInt(5));
        } catch(InterruptedException e) {
          e.printStackTrace();
        }
      }
    

    Your second example works but its not correct.It works only because you have sleep at the beginning of the run method so all threads catch up to register in phaser before you call arrive and advance method on phaser. If you would remove the sleep then after calling this line

    t1.start();
    

    T1 run method will be run and t1 thread would be register in the phaser. Then is possible that the this.phaser.arriveAndAwaitAdvance() in t1 run method will be called before threads t2 and t3 are started and registrated in the phaser so the phaser will not wait for them.

    You should register into phaser in constructor of the task, or in the method that is called before you start the thread.