Search code examples
javamultithreadingdeadlockwaitnotify

Java wait notify deadlock issue


Here's my class which executes dependent Runnables serially. What it does is that all the Runnable are executed in parallel, but on completion of wait for the head Runnable present in the queue to finish first. Once the head finishes, the second item finishes so on and so forth.

The issue with this code is that it causes some kind of deadlock. When executed with many tasks, it stops executing. When pausing the debugger, it shows that all threads are waiting on the wait() statement.

/**
 * Executes all tasks in parallel, with completion handler called only when other tasks of same key are complete.
 * For a given key, the order in which {@link #execute(Object, int, java.util.concurrent.Callable, Runnable, Runnable)} was called will be the order in which completion runnable will be called.
 */
public class DependentExecutor {

    private final Executor executor;
    private final Map<Object, Queue<DependentTask>> allTasks = new ArrayMap<>();
    private final boolean enableDependency;

    public DependentExecutor(boolean enableDependency, Executor executor) {
        this.executor = executor;
        this.enableDependency = enableDependency;
    }

    /**
     * You should return true from the task on successful completion.
     * If task returns false, then completion runnable wont be executed.
     * <p/>
     * This method will return false if tha task with this uniqueId already exists. Otherwise true is returned.
     *
     * @param key                A non null key using which task dependency is decided. Tasks with same key are dependent.
     * @param uniqueId           If there is a task with this uniqueId already present, this task will be rejected
     * @param task               Optional. A long pending task to be performed or null if only completion is to be dependant.
     * @param completionCallback A non null callback which will be serially executed for tasks with same key
     * @param errorCallback      If task returns false, then this callback will be invoked immediately (no dependency)
     */
    public boolean execute(Object key, int uniqueId, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {

        DependentTask queuedTask;
        synchronized (allTasks) {
            Queue<DependentTask> queue = allTasks.get(key);
            for (Map.Entry<Object, Queue<DependentTask>> objectQueueEntry : allTasks.entrySet()) {
                synchronized (objectQueueEntry.getValue()) {
                    Iterator<DependentTask> iterator = objectQueueEntry.getValue().iterator();
                    while (iterator.hasNext()) {
                        DependentTask dependentTask = iterator.next();
                        if (dependentTask.getUniqueId() == uniqueId) {
                            // no 2 tasks can have same uniqueID
                            return false;
                        }
                    }
                }
            }

            if (queue == null && task == null) {
                // this means we have no pending dependency as well as no task to perform. So only callback.
                completionCallback.run();
                return true;
            } else if (queue == null) {
                queue = new LinkedList<DependentTask>();
                allTasks.put(key, queue);
            }
            if (!enableDependency) {
                key = Math.random();
            }
            queuedTask = new DependentTask(key, uniqueId, queue, task, completionCallback, errorCallback);
            queue.add(queuedTask);
        }
        executor.execute(queuedTask);
        return true;
    }

    class DependentTask implements Runnable {

        private final Queue<DependentTask> dependencyQueue;
        private final Callable<Boolean> task;
        private final Object key;
        private final Runnable completionCallback;
        private final Runnable errorCallback;
        private final int uniqueId;

        public DependentTask(Object key, int uniqueId, Queue<DependentTask> dependencyQueue, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {
            this.uniqueId = uniqueId;
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
            this.completionCallback = completionCallback;
            this.errorCallback = errorCallback;
        }

        public int getUniqueId() {
            return uniqueId;
        }

        @Override
        public void run() {
            Boolean result = false;
            try {
                if (task != null) {
                    result = task.call();
                } else {
                    result = true;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (result) {
                    synchronized (dependencyQueue) {

                        while (dependencyQueue.peek() != this) {
                            try {
                                dependencyQueue.wait(); // deadlock !!
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                    completionCallback.run(); // by now we are the first element in the linked list. Lets call completion.
                } else {
                    errorCallback.run(); // by now we are the first element in the linked list. Lets call error callback.
                }
                synchronized (dependencyQueue) {
                    dependencyQueue.remove(); //remove thyself
                    dependencyQueue.notifyAll();
                }

                // clean up of main map
                synchronized (allTasks) {
                    if (dependencyQueue.isEmpty()) {
                        allTasks.remove(key);
                    }
                }
            }
        }
    }
}

Solution

  • Problem#1

    Your logic in removing "self" from the queue is faulty. You remove from the queue unconditionally, and always from the top (i.e. the task doesn't actually remove self from the queue, it always removes the top), but the check on whether the top is actually part of the task is conditional - and only runs if the implementation task returned true.

    So, any time the implementation task returns false, or fails with an exception, the task will remove something from the top of the queue, a good chance it being something other than self. Therefore, that task that was removed is still running, and will never find itself at the top, and will wait endlessly.

    Problem#2

    You are modifying dependencyQueue outside of synchronization. Your queue implementation is LinkedList, and it is not thread safe. You should use:

    synchronized (queue) {
        queue.add(queuedTask);
    }
    

    when you add the new tasks to the queue.

    What most likely is happening that add() is called concurrently with remove(), and that corrupts the internal state of the list. The add() effectively fails (the list doesn't contain the added element), so corresponding thread will never find itself in the list. If you can easily reproduce this, you can test it by hooking up the debugger and evaluating what values are in the queue - you'll see that the "hung" threads are not even present there.