Search code examples
javaexecutorserviceblockingqueuelinkedblockingqueue

Runnable locked (park) using ExecutorService and BlockingQueue


Note: I understand the rules site, but I can't to put all code (complex/large code).

I put a DIFFERENT (all the real code is too much and you don't need here) code in Github but reproduces the Problem (the main class is joseluisbz.mock.support.TestOptimalDSP and switching class is joseluisbz.mock.support.runnable.ProcessorDSP) like the video.

Please don't recommend to me another jar or external library for this code.

I wish I was more specific, but I don't know what part to extract and show. Before you close this question: Obviously, I am willing to refine my question if someone tells me where to look (technical detail).

I made a video in order to show my issue.

Even to formulate the question, I made a diagram to show the situation.

My program has a JTree, showing the relations between Worker.

enter image description here

I have a diagram interaction between threads controlling life with ExecutorService executorService = Executors.newCachedThreadPool(); and List<Future<?>> listFuture = Collections.synchronizedList(new ArrayList<>());

Each Runnable is started in this way listFuture().add(executorService().submit(this)); in its constructor. The lists are created like this: BlockingQueue<Custom> someBlockingQueue = new LinkedBlockingQueue<>();

enter image description here

My diagram shows who the Worker's father is if he has one. It also shows, the writing relationships between the BlockingQueue.

RunnableStopper stops related runnables contained in Worker like property. RunnableDecrementer, RunnableIncrementer, RunnableFilter operates with a cycle that runs each Custom that it receives for its BlockingQueue. For which they always create a RunnableProcessor (it has no loop, but because of its long processing, once the task is finished it should be collected by the GC).

Internally the RunnableIncrementer has a Map Map<Integer, List<Custom>> mapListDelayedCustom = new HashMap<>();//Collections.synchronizedMap(new HashMap<>());

When arrives some Custom... I need to obtain the List of lastReceivedCustom List<Custom> listDelayedCustom = mapListDelayedCustom.putIfAbsent(custom.getCode(), new ArrayList<>()); I'm controlling the Size (is not growing indefinitely).

My code stops working when I add the following lines:

if (listDelayedCustom.size() > SomeValue) {
  //No operation has yet been included in if sentence
}

But commenting the lines doesn't block

//if (listDelayedCustom.size() > SomeValue) {
//  //No operation has yet been included in if sentence
//}

enter image description here What could be blocking my Runnable? It makes no sense that adding the lines indicated (Evaluate the size of a list: if sentence) above stops working.

Any advice to further specify my question?


Solution

  • First, the way you set thread names is wrong. You use this pattern:

    public class Test
    {
        public static class Task implements Runnable
        {
            public Task()
            {
                Thread.currentThread().setName("Task");
            }
    
            @Override
            public void run()
            {
                System.out.println("Task: "+Thread.currentThread().getName());
            }
        }
    
        public static void main(String[] args)
        {
            new Thread(new Task()).start();
            System.out.println("Main: "+Thread.currentThread().getName());
        }
    }
    

    which gives the (undesired) result:

    Main: Task
    Task: Thread-0
    

    It's incorrect because, in the Task constructor, the thread has not started yet, so you're changing the name of the calling thread, not the one of the spawned thread. You should set the name in the run() method.

    As a result, the thread names in your screenshot are wrong.

    Now the real issue. In WorkerDSPIncrement, you have this line:

    List<ChunkDTO> listDelayedChunkDTO = mapListDelayedChunkDTO.putIfAbsent(chunkDTO.getPitch(), new ArrayList<>());
    

    The documentation for putIfAbsent() says:

    If the specified key is not already associated with a value (or is mapped to null) associates it with the given value and returns null, else returns the current value.

    Since the map is initially empty, the first time you call putIfAbsent(), it returns null and assigns it to listDelayedChunkDTO.

    Then you create a ProcessorDSP object:

    ProcessorDSP processorDSP = new ProcessorDSP(controlDSP, upNodeDSP, null,
       dHnCoefficients, chunkDTO, listDelayedChunkDTO, Arrays.asList(parent.getParentBlockingQueue()));
    

    It means you pass null as the listDelayedChunkDTO parameter. So when this line executes in ProcessorDSP:

    if (listDelayedChunkDTO.size() > 2) {
    

    it throws a NullPointerException and the runnable stops.