Search code examples
javadeadlockblockingqueue

Custom LinkedBlockingQueue deadlock


I have been using custom blockingqueue inside ThreadExecutorPool, but some times task workers do not take task and dispacher thread does not put a new task into queue.

I wonder following custom blocking queue implementation causes deadlock. Is there any wrong with this code? Is is better to and synchronized block for add() and take() methods.

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

import com.ttech.utils.alarm.Alarm;
import com.ttech.utils.alarm.AlarmInterface;
import com.ttech.utils.counter.Counter;
import com.ttech.utils.counter.SNMPAgent;

public class WorkerQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 1L;

    public Integer lowThreshold;

    public Integer highThreshold;

    public Integer capacity;

    public String name;

    public String type;

    public Counter counter = null;

    public boolean writeAlarmLog;

    public static final Logger logger = Logger.getLogger(WorkerQueue.class);

    public static Alarm HighThresholdAlarm = null;
    public static Alarm CapacityAlarm = null;

    // Check the size here and clear capacity and high threshold alarms in case
    public E take() throws InterruptedException {
        E data = super.take();
        counter.setNewValue(super.size());
        if (super.size() == lowThreshold) {            
            if(!this.writeAlarmLog) {
                HighThresholdAlarm.clear(name);
                CapacityAlarm.clear(name);
            } else {
                HighThresholdAlarm.clearLog(name, "Queue High Threshold");
                CapacityAlarm.clearLog(name, "Queue Capacity Overload");
            }
        }
        return data;
    }

    public E poll() {
        E data = super.poll();
        counter.setNewValue(super.size());
        if (super.size() == lowThreshold) {
            if(!this.writeAlarmLog) {
                HighThresholdAlarm.clear(name);
                CapacityAlarm.clear(name);
            } else {
                HighThresholdAlarm.clearLog(name, "Queue High Threshold");
                CapacityAlarm.clearLog(name, "Queue Capacity Overload");
            }
        }
        return data;
    }


    public int drainTo(Collection<? super E> c, int maxElements){
       int size = super.drainTo(c,maxElements);       
       counter.setNewValue(super.size());       
       return size;
    }

    // During adding the data to queue check capacity and high threshold raise alarm in case
    public boolean add(E data) {
        Boolean rc = true;

        if (capacity > 0) {
            if (this.size() >= capacity) {
                logger.error("Queue " + name + " is over capacity");
                if(!this.writeAlarmLog)
                    CapacityAlarm.raise(name);
                else
                    CapacityAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue Capacity Overload");
                return false;
            }
        }

        if (!super.add(data)) {
            logger.error("Cannot add data to queue:" + name);
            rc = false;
        } else {
            counter.setNewValue(super.size());
        }

        if (highThreshold == super.size()) {


            if(!this.writeAlarmLog)
                HighThresholdAlarm.raise(name);
            else
                HighThresholdAlarm.raiseLog(AlarmInterface.AS_CRITICAL, name, "Queue High Threshold");
        }

        return rc;
    }
}

Solution

  • ThreadPoolExecutor does not add tasks to its work queue. It offers them and if not accepted passes them to the configured RejectedExecutionHandler. By default this is the abort policy handler, which causes a RejectedExecutionException to be thrown.

    The add method in your custom queue will never be called.

    If you want to track changes in the number of in-flight tasks you have, I would suggest overriding the beforeExecute or afterExecute method of the executor itself. The number of active tasks can be obtained from getActiveCount.