Search code examples
javamultithreadingsynchronizationexecutorserviceexecutor

how to synchronize 2 jobs/processes


I am having a server application which processes 2 jobs(file processing) which are completely different flows. i invoke the client giving either job1 or job 2 so it could be either of these 2 in some interval of time .since the files are picked from a folder I want to synchronize the jobs i.e. if for example i trigger job 2 again quickly after triggering job 2 the first time ;the files in that might not have been processed so i should not pick them up again . and also i would rather wait for the first executor (executor service ) to finish off before triggering another job/executor.

I have tried using a map where i store the job and executor and trying to synchronize the map .but I am not sure how to proceed.some code has been left out for clarity.

Server class:

public class ProcessServer {


public static void main(String[] args) {
    try {
        Integer port = Integer.parseInt(Configuration.getProperty("Environment", "PORT"));
        ServerSocket serverSocket = new ServerSocket(port);
        LOG.info("Process Server listening on PORT: " + port);
        while (true) {
            Socket socket = serverSocket.accept();
            new Thread(new ProcessEvent(socket)).start();
        }
    } catch (Throwable th) {
        LOG.error("Exception occured starting server.", th);
    }
}
}    

ProcessEvent Class:

public class ProcessEvent implements Runnable {
   //code to extract argument(event/jobtype) from socket stream
            private void processEvent(Event event) {
    switch (event.getType()) {
        case 1:
            new ProcessJob1().execute(MAP1);
            break;
        case 2:
            new ProcessJob2().execute(MAP2);
            break;
        default:
            break;
    }
}

Job Class 1:

public class ProcessJob1 extends Job {

private static Map<String, ExecutorService> isRunning = new HashMap<String, ExecutorService>();

@Override
public void execute(Map<String, Object> jobData) {

    String txn = (String)jobData.get(TYPE);
    ExecutorService executor = null;

    synchronized (isRunning) {
        executor = isRunning.get(type); 
        if (executor != null && !executor.isTerminated()) {
            return;
        }

        executor = Executors.newFixedThreadPool(MAX_THREAD_CNT);
        isRunning.put(type, executor);
    }


    File[] inputFiles = getValidFiles();
    if (inputFiles.length > 0) {
        for (File inputFile : inputFiles) {
            executor.execute(new ProcessFileTask1(inputFile));
        }
    }
    executor.shutdown();
}

}

Job Class 2:

public class ProcessJob2 extends Job {

private static ExecutorService executor = null;

@Override
public void execute(Map<String, Object> jobData) {
    if (executor != null && !executor.isTerminated()) {
        return;
    }

    executor = Executors.newFixedThreadPool(2);


    File[] inputFiles = getValidFiles();
    if (inputFiles.length > 0) {
        ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD_CNT);
        for (File inputFile : inputFiles) {
            executor.execute(new ProcessFileTask2(inputFile));
        }
        executor.shutdown();
    }
}

}


Solution

  • Instead of creating new executors for every time you have to use one executors with single threaded approach.Executors#newSingleThreadExecutor()

    public class ProcessJob1 extends Job {
    
    private static Map<String, ExecutorService> isRunning = new HashMap<String, ExecutorService>();
    
    private static ExecutorService executor = Executors.newFixedThreadPool(MAX);
    
    private static CountDownLatch countDownLatch = new CountDownLatch(0);
    @Override
    public void execute(Map<String, Object> jobData) {
    
    
        File[] inputFiles = getValidFiles();
        countDownLatch.await();
    
        if (inputFiles.length > 0) {
            countDownLatch = new CountDownLatch(inputFiles.length);
            for (File inputFile : inputFiles) {
                  case 1:
                      executor.execute(new ProcessFileTask1(inputFile,countDownLatch));
                      break;
                  case 2:
                      executor.execute(new ProcessFileTask2(inputFile,countDownLatch));
                      break;
                  default:
                      break;
            }
        }
    }
    

    }

    You can use CountDownLatch to achieve this.

    Initially it will be zero . If you call wait , it will return.

    After You will reset the latch value to size of the files.

    Pass the latch to ProcessFileTask1 and call latch.countDown() once it has done its.

    Next job will enter the latch.await(), it will wait until all the task will complete. If already completes it will come out immediately from await.

    Instead of branching above you should branch here. So You have more control to sync thread