Search code examples
javamultithreadingschedulerthread-priority

scheduling periodical threads with priority


I have to implement a kind of multithreading application in java 8.

My application should run some Threads periodically, let's say once every 30 minutes. Each Thread should call an api, get the data from it and save into the database. The database is organized in this way:

Table A
Table B
Table C
Table D
Table E
Table F
Table G
Table H

Tables F, G and H have foreign keys to Tables A, B, C, D and E.

This lead to have a Thread for each table to update:

Thread A -> Table A
Thread B -> Table B
Thread C -> Table C
Thread D -> Table D
Thread E -> Table E
Thread F -> Table F
Thread G -> Table G
Thread H -> Table H

Since some tables have foreign keys to other tables, the Threads should not start all at the same time, but first should start Thread A, B, C and E and once they have finished, then should start Thread F, G and H.

I have searched a lot on the internet for a possible solution, but I haven't found the right one for my situation.

For the moment I have implemented my code with ScheduledThreadPoolExecutor that has the function scheduleWithFixedDelay that ensures me that the Threads are executed once every 30 minutes. Even if I schedule every thread with different priority, they all start at the same time and I have exception while executing insert/update in the tables with foreign keys.

Here is a snapshot of my code that might help to understand.

The Thread-like classes look like this:

public class AllarmReasonService implements Runnable {

    @Override
    public void run() {
        AllarmReasonDAO dao = new AllarmReasonDAO();
        PagedResultContainer<AllarmReason> allarmReasonContainer = AllarmReasonApi.getInstance().getAllarmReasons();
        for(Iterator<AllarmReason> iterator = allarmReasonContainer.getData().iterator(); iterator.hasNext();){
            AllarmReason allarmReason = iterator.next();
            dao.insertOrUpdateAllarmReason(allarmReason);
        }
    }
}

The threadExecutor:

public void threadExecutor(){
    initializeThreadMap();
    Stream<Entry<String, Integer>> sorted = threadTimerMap.entrySet().stream().sorted(Entry.comparingByValue());
    scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadTimerMap.size());

    for(Iterator<Entry<String, Integer>> entryIterator = sorted.iterator(); entryIterator.hasNext();){
        Entry<String, Integer> entry = entryIterator.next();
        Runnable service = threadMap.get(entry.getKey());
        Thread thread = new Thread(service);
        thread.setPriority(entry.getValue());
        scheduler.scheduleWithFixedDelay(thread, 0, 30 * 60 * 1000, TimeUnit.MILLISECONDS);
    }
}

private void initializeThreadMap(){
        threadMap.clear();
        threadMap.put("causaliAllarme", new AllarmReasonService());
        threadMap.put("causaliLavorazione", new ManufactureReasonService());
        threadMap.put("celle", new CellService());
        threadMap.put("dipendenti", new EmployeeService());
        threadMap.put("ordiniLavorazione", new ManufactureOrderService());
        threadMap.put("parzialiLavorazione", new ManufacturePartialService());
        threadMap.put("qualita", new QualityService());
        threadMap.put("qualitaRilevata", new QualityDetectionService());
    }

And, finally, the threadTimerMap has key = name of the module/thread (i.e. causaliAllarme) and value = priority where priority is higher for modules to update first.

With this setting, I am not able to run first the most important Threads even if I set a certain priority.

Consider that I am not a pro with Thread and multiThread.

Any help would be apreciated. Thanks in advance.


Solution

  • For such situations I'll advice to use CompletableFuture features.

    // Declare your pool. Declare ThreadFactory also, it will be very
    // helpful with debug, I promise :) 
    int corePoolSize = 10;
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(corePoolSize,
          new ThreadFactoryBuilder().setNameFormat("Your-thread-%d").setDaemon(true).build());
    
    List<CompletableFuture<Void>> dependencies = new ArrayList<>();
    
    // Submit the threads for the first stage
    dependencies.add(CompletableFuture.runAsync(new AllarmReasonService(), pool));
    dependencies.add(CompletableFuture.runAsync(new ManufactureReasonService(), pool));
    // ...
    // do the same with all your stage-1 threads
    
    // wait while stage 1 completed
    try {
        for (CompletableFuture<Void> f : dependencies) {
            f.get();
        }
    } catch (InterruptedException | ExecutionException e) {
        // log or re-throw
        pool.shutdownNow();
    }
    
    // stage 2
    CompletableFuture.runAsync(new AllarmReasonService(), pool);
    CompletableFuture.runAsync(new ManufactureReasonService(), pool);
    // other required ...
    

    Also you can aggregate futures with CompletableFuture.allOf(CompletableFuture<?>...) method and wait for single future.

    Hope it helps!