Search code examples
jakarta-eeconcurrencymanagedthreadfactory

How to build a recursive tree in Java-ee?


Here is my pseudo-code:

class Builder implements Callable<T> {
  T obj;
  ManagedExecutorService pool;

  Builder (T obj, ManagedExecutorService pool){
    this.obj = obj;
    this.pool = pool;
  }

  T call(){
    build();
  }

  private void build(){
        // skip if already traversed
        return isTraversed(obj);

        // call db and get obj's one-to-many relationships
        Collection<T> colOfChildObj = DbUtil.getChildrenPOJO(obj);
        for (<Collection>T childObj : colOfChildObj){
            this.pool.submit(new Builder(childObj, this.pool));
        }
        // set children as-is, when the submit above completes,
        // it will update childObj and thus will reflect
        // obj.childObj.itsChidren etc. For this though the caller
        // has to wait until all submits are processed 
        obj.setChildren(colOfChildObj); 
  }
}

Since Java-ee does not support ForkJoinPool - that is out of the question. So how do I do it with either ManagedThreadFactory and/or ManagedExecutorService? My real challenge is due to not being able to call pool.shutdown() or pool.awaitTermination in Java-ee. So, from the caller, if I do:

class Caller () {
  T getObjGraph(T rootObj){
     pool.submit(new Builder(rootObj));
     T objGraph = pool.get();
     return objGraph;
  }
} 

Then my method does not wait for all the pool.submit(new Builder(childObj, pool)) and thus my object does not have everything set and is incomplete. I thought of putting all Futures returned by pool.submit into a blocking queue - but then I don't know how to inform the caller that my tree traversal is complete. I do have a counter that reaches 0 when the tree traversal is complete but since the client is submitting a top level node, I'm not sure how to make it wait there in Java-ee without while(isCounter = 0) - which is a CPU hog.

Any pointers?


Solution

  • I think I understand what you're trying to do. You can just use a thread-safe counter, increment it every time you create and submit a new task for a given node, and decrement it when the task for this node is terminated.

    In the main thread, you wait on a lock util the remaining number of nodes to process is 0. And in each task, you notify the lock to signal that a tack is terminated.

    Here is a complete example. It starts from a tree where each node has a name, and transforms this tree into another tree where each node is "Hello " concatenated with the original name.

    public class Tree {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Node root = new Node("R");
    
            Node c1 = new Node("C1");
            Node c2 = new Node("C2");
    
            root.addChild(c1);
            root.addChild(c2);
    
            Node gc11 = new Node("GC11");
            Node gc12 = new Node("GC12");
            c1.addChild(gc11);
            c1.addChild(gc12);
    
            Node gc21 = new Node("GC11");
            Node gc22 = new Node("GC12");
            c2.addChild(gc21);
            c2.addChild(gc22);
    
            System.out.println("root = " + root);
    
            ExecutorService executor = Executors.newFixedThreadPool(4);
            final Object lock = new Object();
            final AtomicInteger remaining = new AtomicInteger(0);
            Future<Node> result = executor.submit(new HelloTask(root, null, executor, remaining, lock));
    
            synchronized (lock) {
                while (remaining.get() != 0) {
                    lock.wait();
                }
            }
    
            Node helloRoot = result.get();
    
            System.out.println("helloRoot = " + helloRoot);
    
            executor.shutdown();
        }
    
        private static class HelloTask implements Callable<Node> {
            private final Node source;
            private final Node parent;
            private final ExecutorService executorService;
            private final Object lock;
            private final AtomicInteger remaining;
    
            public HelloTask(Node source, Node parent, ExecutorService executorService, AtomicInteger remaining, Object lock) {
                this.source = source;
                this.parent = parent;
                this.executorService = executorService;
                this.lock = lock;
                this.remaining = remaining;
                remaining.incrementAndGet();
            }
    
            @Override
            public Node call() throws Exception {
                // simulate some time
                Thread.sleep(1000L);
                Node result = new Node("Hello " + source.getName());
                if (parent != null) {
                    parent.addChild(result);
                }
                for (Node child : source.getChildren()) {
                    executorService.submit(new HelloTask(child, result, executorService, remaining, lock));
                }
    
                remaining.decrementAndGet();
                synchronized (lock) {
                    lock.notifyAll();
                }
                return result;
            }
        }
    
        private static class Node {
            private final String name;
            private final List<Node> children = new CopyOnWriteArrayList<>();
    
            public Node(String name) {
                this.name = name;
            }
    
            public String getName() {
                return name;
            }
    
            public List<Node> getChildren() {
                return children;
            }
    
            public void addChild(Node child) {
                this.children.add(child);
            }
    
            @Override
            public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(name);
                sb.append('\n');
                children.forEach(sb::append);
                return sb.toString();
            }
        }
    }