Search code examples
javaexecutorserviceblockingqueue

How to gracefully wait to job task finish in BlockingQueue java


I am writing a job queue using BlockingQueue and ExecutorService. It basically waiting new data in the queue, if there are any data put into the queue, executorService will fetch data from queue. But the problem is that i am using a loop that loops to wait the queue to have data and thus the cpu usage is super high. I am new to use this api. Not sure how to improve this.

ExecutorService mExecutorService = Executors.newSingleThreadExecutor();
BlockingQueue<T> mBlockingQueue = new ArrayBlockingQueue();

public void handleRequests() {
       Future<T> future = mExecutorService.submit(new WorkerHandler(mBlockingQueue, mQueueState));
        try {
              value = future.get();
        } catch (InterruptedException | ExecutionException e) {
              e.printStackTrace();
        }
        if (mListener != null && returnedValue != null) {
              mListener.onNewItemDequeued(value);
        }
     }
}

private static class WorkerHandler<T> implements Callable<T> {

    private final BlockingQueue<T> mBlockingQueue;
    private PollingQueueState mQueueState;

    PollingRequestHandler(BlockingQueue<T> blockingQueue, PollingQueueState state) {
        mBlockingQueue = blockingQueue;
        mQueueState = state;
    }

    @Override
    public T call() throws Exception {
        T value = null;
        while (true) {   // problem is here, this loop takes full cpu usage if queue is empty
            if (mBlockingQueue.isEmpty()) {
                mQueueState = PollingQueueState.WAITING;
            } else {
                mQueueState = PollingQueueState.FETCHING;
            }
            if (mQueueState == PollingQueueState.FETCHING) {
                try {
                    value = mBlockingQueue.take();
                    break;
                } catch (InterruptedException e) {
                    Log.e(TAG, e.getMessage(), e);
                    break;
                }
        }
  }

Any suggestions on how to improve this would be much appreciated!


Solution

  • You don't need to test for the queue to be empty, you just take(), so the thread blocks until data is available.

    When an element is put on the queue the thread awakens an value is set.

    If you don't need to cancel the task you just need:

    @Override
    public T call() throws Exception {
        T value = mBlockingQueue.take();
        return value;
    }
    

    If you want to be able to cancel the task :

    @Override
    public T call() throws Exception {
        T value = null;
        while (value==null) {   
                try {
                    value = mBlockingQueue.poll(50L,TimeUnit.MILLISECONDS);
                    break;
                } catch (InterruptedException e) {
                    Log.e(TAG, e.getMessage(), e);
                    break;
                }
        }
        return value;
    }