Search code examples
javaconcurrencyconcurrenthashmap

Have scheduledExecutor call a method with different arguments, so each thread processes for only one argument


In my constructor for my class, I create an executor

scheduledExecutorService = Executors.newScheduledThreadPool(
    ARGUMENTS.size(),
    ExecutorServiceUtils.createThreadFactory("process-%d"));

I have a Set named ARGUMENTS that contains 5 ArgumentTypes (an enum) (ARG1, ARG2, ARG3, ARG4, ARG5).

I scheduleAtFixedRate for each:

        for (ArgumentType arg : ARGUMENT_TYPES) {
            scheduledExecutorService.scheduleAtFixedRate(() -> 
               work(arg),
               SCHEDULED_TASK_INITIAL_DELAY_MILLIS, 
               1L, TimeUnit.MILLISECONDS);
        }

Then I have a method named work(ArgumentType). When the threads execute, I see in my logs that any thread can call work with any of the 5 ArgumentTypes.

This isn’t what I really want to do. I would like to have a thread dedicated to each of the argumentTypes, where process-0 would always do the arg1 for example.

But, I see that all 5 threads are calling work with any of the ArgumentTypes.

How can I schedule my threads so that each interval, all 5 document types get processed concurrently?

Also, in my work method, I have this code, where myMap is a ConcurrentHashMap<String, WorkValue> where WorkValue is an enum. If the value of an entry is WorkValue.READY, I want to set it to WorkValue.PROCESSING. Since multiple threads call this, I have added a synchronized block around this.

        for (Map.Entry<String, WorkValue> entry : myMap.entrySet()) {
            synchronized (this) {
                if (entry.getValue().equals(WorkValue.READY)) {
                    entry.setValue(WorkValue.PROCESSING);
                    ids.add(entry.getKey());
                }
            }
        }

Is my use of synchronized correct here?


Solution

  • This isn’t what I really want to do. I would like to have a thread dedicated to each of the argumentTypes, where process-0 would always do the arg1 for example.

    I think ScheduledThreadPoolExecutor should not be able to do this. You cannot control a worker thread in the thread pool to only take tasks corresponding to a certain argument. You can create a Timer for each argument, timer internally uses a dedicated thread to handle all timing tasks:

            Timer timer = new Timer();
            timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    // remember use try catch here, see https://stackoverflow.com/questions/8743027/java-timer-class-timer-tasks-stop-to-execute-if-in-one-of-the-tasks-exception-i
                    try {
                        work(arg);
                    } catch (Exception e) {
    
                    }
                }
             },...);