I read java8 doc about CountedCompleter, which gives sample usage code as follow:
class MyOperation<E> { void apply(E e) { ... } }
class ForEach<E> extends CountedCompleter<Void> {
public static <E> void forEach(E[] array, MyOperation<E> op) {
new ForEach<E>(null, array, op, 0, array.length).invoke();
}
final E[] array; final MyOperation<E> op; final int lo, hi;
ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
super(p);
this.array = array; this.op = op; this.lo = lo; this.hi = hi;
}
public void compute() { // version 1
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(2); // must set pending count before fork
new ForEach(this, array, op, mid, hi).fork(); // right child
new ForEach(this, array, op, lo, mid).fork(); // left child
}
else if (hi > lo)
op.apply(array[lo]);
tryComplete();
}
}
In compute
method, Every ForEach object will fork two subtasks and set pending count to 2,
but in the end of compute
method, tryComplete
can only decrement one in pending count, how
does the rest one??
After reading source code of ForkJoinPool
and CountedCompleter
, I finally understand it.
All CountedCompleters derived from root CountedCompleter will be organized like a tree. When you call tryComplete
, if current pending count is positive, it will decrement 1. Otherwise it will call onCompletion
of current CountedCompleter, then recursively call tryComplete
on parent CountedCompleter. If parent CountedCompleter is null
, it means it is already root CountedCompleter, then the whole task complete.
So we know that:
CountedCompleter
task will not end after compute()
method is done, it will wait until pending count decrements to 0CountedCompleter
task isn't always ended by root task itself (very different from RecursiveTask
and RecursiveAction
), it can be ended by childs' tryComplete
Then let's see the code fragment in java 8 doc (note sequence number 0: 1: 2: 3: in code, it is one possible execute order):
class MyOperation<E> { void apply(E e) { ... } }
class ForEach<E> extends CountedCompleter<Void> {
public static <E> void forEach(E[] array, MyOperation<E> op) {
new ForEach<E>(null, array, op, 0, array.length).invoke();
}
final E[] array; final MyOperation<E> op; final int lo, hi;
ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
super(p);
this.array = array; this.op = op; this.lo = lo; this.hi = hi;
}
public void compute() { // version 1
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(2); // 0: +2
new ForEach(this, array, op, mid, hi).fork(); // 2: -1
new ForEach(this, array, op, lo, mid).fork(); // 3: pending count == 0 complete
}
else if (hi > lo)
op.apply(array[lo]);
tryComplete(); // 1: -1
}
}