Search code examples
javaconcurrencyblockingqueue

how to terminate retrieval from a blocking queue


I have some code where i execute a several tasks using Executors and a Blocking Queue. The results have to be returned as an iterator because that is what the application that i work on expects. However, there is a 1:N relationship between the task and the results added to the queue, so i cannot use the ExecutorCompletionService. While calling hasNext(), i need to know when all the tasks have finished and added all the results to the queue, so that i can stop the retrieval of results from the queue. Note, that once items are put on the queue, another thread should be ready to consume (Executor.invokeAll(), blocks until all tasks have completed, which is not what i want, nor a timeout). This was my first attempt, i am using an AtomicInteger just to demonstrate the point even though it will not work. Could someone help me in undestanding how i can solve this issue?

public class ResultExecutor<T> implements Iterable<T> {
    private BlockingQueue<T> queue;
    private Executor executor;
    private AtomicInteger count;

    public ResultExecutor(Executor executor) {
        this.queue = new LinkedBlockingQueue<T>();
        this.executor = executor;
        count = new AtomicInteger();            
    }

    public void execute(ExecutorTask task) {
        executor.execute(task);
    }

    public Iterator<T> iterator() {
        return new MyIterator();
    }

    public class MyIterator implements Iterator<T> {
        private T current;          
        public boolean hasNext() {
            if (count.get() > 0 && current == null)
            {
                try {
                    current = queue.take();
                    count.decrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return current != null;
        }

        public T next() {
            final T ret = current;
            current = null;
            return ret;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

    public class ExecutorTask implements Runnable{
        private String name;

        public ExecutorTask(String name) {
            this.name = name;

        }

         private int random(int n)
         {
           return (int) Math.round(n * Math.random());
         }


        @SuppressWarnings("unchecked")
        public void run() {
            try {
                int random = random(500);
                Thread.sleep(random);
                queue.put((T) (name + ":" + random + ":1"));
                queue.put((T) (name + ":" + random + ":2"));
                queue.put((T) (name + ":" + random + ":3"));
                queue.put((T) (name + ":" + random + ":4"));
                queue.put((T) (name + ":" + random + ":5"));

                count.addAndGet(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }                   
        }           
    }       

}

And the calling code looks like:

    Executor e = Executors.newFixedThreadPool(2);
    ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);

    resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
    resultExecutor.execute(resultExecutor.new ExecutorTask("B"));

    Iterator<Result> iter = resultExecutor.iterator();
    while (iter.hasNext()) {
        System.out.println(iter.next());
    }

Solution

  • Here is an alternate solution that uses a non-blocking queue with wait/notify, AtomicInteger and a callback.

    public class QueueExecutor implements CallbackInterface<String> {
    
        public static final int NO_THREADS = 26;
    
        private Object syncObject = new Object();
        private AtomicInteger count;
        Queue<String> queue = new LinkedList<String>();
    
        public void execute() {
            count = new AtomicInteger(NO_THREADS);
            ExecutorService executor = Executors.newFixedThreadPool(NO_THREADS/2);
            for(int i=0;i<NO_THREADS;i++)
                executor.execute(new ExecutorTask<String>("" + (char) ('A'+i), queue, this));
    
            Iterator<String> iter = new QueueIterator<String>(queue, count);
            int count = 0;
            while (iter.hasNext()) {
    
                System.out.println(iter.next());
                count++;
            }
    
            System.out.println("Handled " + count + " items");
        }
    
        public void callback(String result) {
            System.out.println(result);
            count.decrementAndGet();
            synchronized (syncObject) {
                syncObject.notify();
            }
        }
    
        public class QueueIterator<T> implements Iterator<T> {
            private Queue<T> queue;
            private AtomicInteger count;
    
            public QueueIterator(Queue<T> queue, AtomicInteger count) {
                this.queue = queue;
                this.count = count;
            }
    
            public boolean hasNext() {          
                while(true) {                   
                    synchronized (syncObject) {
                        if(queue.size() > 0)
                            return true;
    
                        if(count.get() == 0)
                            return false;
    
                        try {
                            syncObject.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
    
            public T next() {
    
                synchronized (syncObject) {
                    if(hasNext())
                        return queue.remove();
                    else
                        return null;
                }
            }
    
            public void remove() {
                throw new UnsupportedOperationException();
            }
    
        }
    
        class ExecutorTask<T> implements Runnable {
            private String name;
            private Queue<T> queue;
            private CallbackInterface<T> callback;
    
            public ExecutorTask(String name, Queue<T> queue,
                    CallbackInterface<T> callback) {
                this.name = name;
                this.queue = queue;
                this.callback = callback;
            }
    
            @SuppressWarnings("unchecked")
            public void run() {
                try {
                    Thread.sleep(1000);
                                        Random randomX = new Random();
                    for (int i = 0; i < 5; i++) {
                        synchronized (syncObject) {
                            Thread.sleep(randomX.nextInt(10)+1);
    
                            queue.add((T) (name + ":" + ":" + i));
                            syncObject.notify();
                        }
                    }
    
                    callback.callback((T) (name + ": Done"));
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    public interface CallbackInterface<T> {
        void callback(T result);
    }
    

    And the calling code is simply:

        QueueExecutor exec = new QueueExecutor();
        exec.execute();