Search code examples
javamultithreadingjava.util.concurrentjava-threads

Serialized access to Collection with the use of ConcurrentLinkedQueue<T>


I have the following question concerning Java 7 ConcurrentLinkedQueue. Let us assume that I have the following class:

public class Blah {
    private ConcurrentLinkedQueue<String> queue;

    public Blah() {
        queue = new ConcurrentLinkedQueue<String>();
    }

    public void produce(String action, String task) throws InterruptedException {
        synchronized(queue) {
            while(queue.size() >= 8) 
                queue.wait();
            queue.add(action + "#" + task);
            queue.notifyAll();
        }
    }

    public void consume() throws InterruptedException {
        synchronized(queue) {
            while(queue.size() <= 0)
                queue.wait();
            String element = queue.poll();
            StringTokenizer strTok = new StringTokenizer(element, "#");
            String action = strTok.nextToken();
            String task = strTok.nextToken();
            /**
             * Operate on request
             */
            queue.notifyAll();
        }
    }
}

The produce and consume functions will be called by concurrent threads in order to generate/remove threads for/from the list. I implement the previous functions consume() and produce() so that I serialize the addition/removal of elements in my queue. Is the above required, or ConcurrentLinkedQueue takes care of that? I am asking because I do not want to reduce the performance of my code.

Thank you, Nick


Solution

  • TL;DR: You are using a Queue specifically designed to be non-blocking as a BlockingQueue.

    Your code can be rewritten as:

    public class Blah {
        private BlockingQueue<String> queue;
    
        public Blah() {
            queue = new LinkedBlockingQueue<>(8);
        }
    
        public void produce(String action, String task) throws InterruptedException {
            while (true) {
                queue.put(action + "#" + task);
            }
        }
    
        public void consume() throws InterruptedException {
            while (true) {
                final String[] data = queue.take().split("#");
                final String action = data[0];
                final String task = data[1];
            }
        }
    }
    

    The BlockingQueue is bounded at 8 elements. put will block if the queue is full. take will block if the queue is empty.

    No synchronization is required.

    Also, StringTokenizer is deprecated. I would suggest you use a class something like:

    public class Data {
        private final String action;
        private final String task;
    
        public Data(final String action, final String task) {
            this.action = action;
            this.task = task;
        }
    
        public String getAction() {
            return action;
        }
    
        public String getTask() {
            return task;
        }
    }
    

    To exchange data. There is no reason to create and parse Strings.