Search code examples
javamultithreadingserializationconcurrencyjava.util.concurrent

Why does Java concurrent processing not work with newly instantiated objects, while it works with deserialized objects of the same class?


I am using java.util.concurrent.ExecutorService to perform a concurrent calculation on all available processing resources. In the code below an instance of MyProcessor class in its performParallelProcessing method creates a number of ProcessingExecutor class instances and submits them to the ExecutorService instance expecting to get the corresponding callbacks.

The processing happens in performProcessing method of ProcessingExecutor class. The data that I use for processing are object instances of class ComputationData. They either can be retrieved from the file system (if serialized data exist), or can be initialized as new instances.

Here is the problem:

In the case if ComputationData object instances are deserialized from the file system, the concurrent processing executes as I expect it to do. It runs in parallel on all the processing cores, occupying 100% of the processing resources.

In the case if ComputationData object instances are newly initialized, the concurrent processing does not execute as I expect. It runs as if it was a single-threaded execution, occupying around 15% of the processing resources.

As I can guess, something is wrong with my newly initialized ComputationData object instances. But I don't know what can be wrong with them and why the concurrency does not work for them while it works for their serialized->deserialized versions. Any hints or ideas will be greatly appreciated.

public class MyProcessor {
    private boolean processingFinished = false;

    public void performParallelProcessing(){
        int count = 0;
        boolean continueProcessing = true;

        int nrOfProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(nrOfProcessors);

        while (continueProcessing){
            ProcessingExecutor task = new ProcessingExecutor(count);
            task.setCaller(this);
            es.submit(task);
            count++;

            if (!processingFinished){
                try{
                    Thread.sleep(50);
                }
                catch(SecurityException | InterruptedException e){
                    //Exception handling
                }
            }
            else{
                continueProcessing = false;
            }
        }
    }

    public void callBack(ProcessingResult result) {
        if(result.allDataProcessed()){
            this.processingFinished = true;
        }
    }
}

public class ProcessingExecutor implements Callable {
    private MyProcessor processor;
    private int count;

    public ProcessingExecutor(int count){
        this.count = count;
    }

    public Object call() {
        ProcessingResult result = null;
        try {
            result = performProcessing();
        }
        catch (SecurityException e) {
            //Exception handling
        }

        processor.callBack(result);     
        return null;
    }

    public void setCaller(MyProcessor processor) {
       this.processor = processor;
    }

    public MyProcessor getCaller() {
        return this.processor;
    }

    private ProcessingResult performProcessing(){
        ComputationData data = null;

        if(serializedDataExist()){
            data = getSerializedData(count);
        }
        else{
            data = initializeNewData(count);
        }

        ProcessingResult result = new ProcessingResult(data, count);
        return result;
    }

    private ComputationData getSerializedData(int count){
        ComputationData data = null;
        // code to retrieve a ComputationData object from the file system
        // based on 'count' value.
        return data;
    }

    private ComputationData initializeNewData(int count){
        ComputationData data = null;
        // code to initialize a new instance of ComputationData class
        // based on 'count' value.
        return data;
    }

    private boolean serializedDataExist(){
        boolean dataFound = false;
        // code to verify whether serialized ComputationData objects are
        // present on the file system.
        return dataFound;
    }
}

Solution

  • Why do you need the Thread.sleep(50) ? This is what's making the concurrent execution into a sequential one, especially if each each computation is <= 50 ms. My guess is that deserialization time + computation time is longer than 50 ms, which would be why in the deserialized objects scenario you have more CPU activity, because you'd have effectively multiple tasks running at the same time in the executor threads. You should try without Thread.sleep(50) or at least with a much smaller timeout.