Search code examples
javacompletable-futurearraydequework-stealing

Java 8: ArrayDeque<>.poll returning null in parallel environment


I am trying to maintain a list of items among multiple threads, one for each thread (lets say one socket connection for each thread for example). I am maintaining this list in an ArrayDeque<>. The problem I am facing is the ArrayDeque<> is exceeding the no of items more than no. of threads in thread pool.

Here is my code:

package com.practice;

import java.util.ArrayDeque;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureApp implements AutoCloseable {
    private static void sleep(long timeMS) {
        try {
            Thread.sleep(timeMS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try (CompletableFutureApp completableFutureApp = new CompletableFutureApp()) {

            Runnable[] tasksList1 = new Runnable[100];

            for (int i = 0; i < tasksList1.length; i++) {
                String msg1 = "TaskList 1 no.: " + i;

                tasksList1[i] = () -> {
                    //System.out.println(msg1);
                    sleep(300);
                };
            }

            completableFutureApp
                    .executeTasks(tasksList1)
                    .thenRun(() -> System.out.println("All tasks completed successfully"));
        }
    }

    private ExecutorService threadPool = Executors.newWorkStealingPool(10);
    private ArrayDeque<Integer> itemsAvailable = new ArrayDeque<>();

    private CompletableFuture executeTasks(Runnable... tasks) {
        CompletableFuture[] futures = new CompletableFuture[tasks.length];

        for (int i = 0; i < tasks.length; i++) {
            Runnable task = tasks[i];

            futures[i] = CompletableFuture.runAsync(() -> {
                Integer item = itemsAvailable.poll();
                if (item == null)
                    item = new Random().nextInt();

                task.run();

                itemsAvailable.add(item);

                System.out.println("Items available: " + itemsAvailable.size());
            }, threadPool);
        }

        return CompletableFuture.allOf(futures).thenRun(() -> System.out.println("Items available at last: " + itemsAvailable.size()));
    }

    @Override
    public void close() {
        threadPool.shutdown();
        try {
            threadPool.awaitTermination(100, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

As in my code, I am creating a work stealing pool of size 10, Could anyone please let me know why no. of elements in ArrayDeque<> is exceeding 10 in this case?


Solution

  • From the Javadoc:

    [Array deques] are not thread-safe; in the absence of external synchronization, they do not support concurrent access by multiple threads.

    Use external synchronization, or select a different class which is specifically designed for concurrent access.