Search code examples
javamultithreadingconcurrencyproducer-consumer

Should I have multiple threads more than cores in this situation?


I have data that comes in from several multicast addresses. In my task consumer thread, I have an array of reentrantlocks, each index corresponding to a multicast address. I will lock fairly on an address. because I want to ensure that each data must be processed sequentially/in correct order

                task = TASK_QUEUE.take(); //arrayblockingqueue
                lock = task.getUdpChannel().getLock();
                try {
                    while (!lock.tryLock(TIMEOUT, TimeUnit.MILLISECONDS)){}  //busy spin for lock
                    runTask(appenderIndex);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                    LOGGER.info("{} {} {} {} {} |", () -> ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of("Asia/Hong_Kong")) ,
                        () -> remoteAdd.getHostName(), () -> remotePort, () -> ni,
                        () -> remaining);
                }

Checking the logs, it seems that some threads are "chattier" than others. I currently have 3 threads, and if it were to interact with the 12 datagram packets sent from 239.2.1.1 55000 as seen below, I believe that the task consumers will behave as if it was single threaded for that amount of time, since all the other threads will be blocked waiting for their turn (only one thread is allowed to lock)

one address sends more than the rest

I am considering to use multiple threads, more than the amount of cores that I have - 12 task consumer threads vs 8 cores (excluding the main thread and producer threads). This should mean that the extra threads can process tasks that are being queued up without waiting for these series of tasks from that specific address to finish completely.

Is it reasonable to have more threads than cores? Would it be better to find out how to have a backlog of tasks (other threads pass tasks from 239.2.1.1 55000 to the thread that currently has the corresponding lock)

I did read this, which gives me the idea that having more threads is better, but how many more threads would it be reasonable to have?

I have wrote some sample code as follows, The following is a working version that is infinitely looping, and has a working task dispatcher.

public class Main {
    
    private final ArrayBlockingQueue<Task> taskPool;
    private final ArrayBlockingQueue<Task> taskQueue;
    private static int capacity = 2000;
    private static int threads = 3;
    private Thread[] producers;
    private Thread[] consumers;
    
    public Main() {
        this.taskPool = new ArrayBlockingQueue<>(capacity);
        this.taskQueue = new ArrayBlockingQueue<>(capacity, true);
        this.producers = new Thread[threads];
        consumers = new Thread[threads];
        ReentrantLock[] locks = new ReentrantLock[4]; 
        TaskConsumer[] cArr = new TaskConsumer[threads];
        TaskDispatch dispatch = new TaskDispatch(this.taskQueue, cArr, threads);
        for (int i=0; i< locks.length; i++) {
            locks[i] = new ReentrantLock();
        }
        for (int i=0; i< threads; i++) {
            this.producers[i] = new Thread(new TaskProducer(this.taskPool, this.taskQueue), "producer"+i);
            this.producers[i].start();
            cArr[i] = new TaskConsumer(this.taskPool, locks, capacity, dispatch);
            this.consumers[i] = new Thread(cArr[i], "consumer"+i);
            this.consumers[i].start();
        }
        Thread dThread = new Thread(dispatch, "dispatch");
        dThread.start();
    }
    
    public void fillPool() throws InterruptedException {
        for (int i=0; i< capacity; i++) {
            taskPool.put(new Task());
        }
    }

}

public class TaskProducer implements Runnable{
    private final static String[] randomStrings = {"random", "payload", "to", "simulate"};
    private ArrayBlockingQueue<Task> TASK_POOL;
    private ArrayBlockingQueue<Task> TASK_QUEUE;

    public TaskProducer(ArrayBlockingQueue<Task> pool, ArrayBlockingQueue<Task> queue) {
        // TODO Auto-generated constructor stub
        this.TASK_POOL = pool;
        this.TASK_QUEUE = queue;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Task task = null;
        Random rand = new Random();
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            if (task == null) {
                try {
                    task = TASK_POOL.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
//          System.out.println("iterating"+task);
            if (task != null) {
                int idx = rand.nextInt(0, 4);
                task.setId(idx);
                task.setData(Instant.now().toString());
                try {
                    TASK_QUEUE.put(task);
                    task = null;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

}

public class TaskConsumer implements Runnable {
    private ArrayBlockingQueue<Task> TASK_POOL;
    private ReentrantLock[] locks;
    private ArrayBlockingQueue<Task> internalQueue;
    private TaskDispatch dispatch;

    public TaskConsumer(ArrayBlockingQueue<Task> pool, ReentrantLock[] locks, int capacity, TaskDispatch dispatch) {
        // TODO Auto-generated constructor stub
        this.TASK_POOL = pool;
        this.locks = locks;
        this.internalQueue = new ArrayBlockingQueue<>(capacity, true);
        this.dispatch = dispatch;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Task task = null;
        while (true) {
//          System.out.println("iterating "+Thread.currentThread().getName());
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            if (task == null) {
                try {
                    task = internalQueue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (task != null) {
                //process
                int id = task.getId();
                System.out.println(Thread.currentThread().getName()+" "+task.getId()+" "+task.getData()+" "+Instant.now().toString());
                //return
                try {
                    if (internalQueue.peek() == null) {
                        synchronized (dispatch) {
                            if (internalQueue.peek() == null) {
                                dispatch.signalFreeConsumer(task.getId(), this);
                            }
                        }
                    }
                    TASK_POOL.put(task);
                    task = null;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    
    public void putInternalQueue(Task t) throws InterruptedException {
        this.internalQueue.put(t);
    }

}
public class TaskDispatch implements Runnable {
    private ConcurrentHashMap<Integer , TaskConsumer> mapping;
    private ArrayBlockingQueue<TaskConsumer> freeConsumers;
    private ArrayBlockingQueue<Task> TASK_QUEUE;
    private TaskConsumer[] consumers;

    public TaskDispatch( ArrayBlockingQueue<Task> queue,TaskConsumer[] consumers, int threads) {
        mapping = new ConcurrentHashMap<>(threads);
        freeConsumers = new ArrayBlockingQueue<>(threads);
        this.TASK_QUEUE = queue;
        this.consumers = consumers;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for (int i=0; i< consumers.length; i++) {
            try {
                freeConsumers.put(consumers[i]);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Task task = null;
        TaskConsumer free = null;
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            if (task == null) {
                try {
                    task = TASK_QUEUE.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (task != null) {
                if (mapping.containsKey(task.getId())) {
                    synchronized (this) {
                        if (mapping.containsKey(task.getId())) {
                            try {
                                mapping.get(task.getId()).putInternalQueue(task);
                                task = null;
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }else {
                            continue;
                        }
                    }
                }else {
                    try {
                        if (free == null) {
                            free = freeConsumers.poll(5000, TimeUnit.MILLISECONDS);
                        }
                        
                        if (free != null) {
                            synchronized (this) {
                                mapping.putIfAbsent(task.getId(), free);
                                free.putInternalQueue(task);
                                task = null;
                                free = null;
                            }
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    public void signalFreeConsumer(Integer id, TaskConsumer consumer) throws InterruptedException {
        synchronized (this) {
//          System.out.println("removing "+id+" "+Thread.currentThread().getName());
            mapping.remove(id, consumer);
            freeConsumers.put(consumer);
//          System.out.println("notify "+Arrays.asList(freeConsumers.toArray()));
        }
    }

}
public class Runner {

    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        Main m = new Main();
        m.fillPool();
    }

}
public class Task {
    private int id;
    private String data;
    
    public Task () {}

    public Task(int id, String dat) {
        // TODO Auto-generated constructor stub
        this.id = id;
        this.data = dat;
    }

    public int getId() {
        return this.id;
    }

    public String getData() {
        return this.data;
    }

    public void setId(int id) {
        this.id = id;
    }

    public void setData(String data) {
        this.data = data;
    }
    
    public String toString() {
        return ""+id+" "+data;
    }
}

Solution

  • You can implement something like this.

    There is a fixed set of threads equal to the number of CPU cores. Some dispatcher takes the task out of the queue, checks the channel, gets the available thread and marks that this thread is currently serving this channel (some sort of Map<Channel, Thread>). If the next task belongs to the same channel and the thread is still busy then the task should be added to that thread queue. If the task belongs to some other channel then get the next available thread. If all threads are busy then wait for the one to become available. Once all the tasks in the thread queue are done it is removed from the map and the thread is ready to process any other channel task.

    As for your question regarding the number of threads. It depends on many internal factors that out of your control and usually testing is the best way to tune the number of threads. But the rule of thumb: if your task utilizes CPU for 100% (some intensive computation, math, sorting arrays and etc.) then the number of threads equal to the number of CPU cores will give you more or less optimal performance. If the task blocks for waiting some I/O operations, external service response (like DB or HTTP server) then you may increase the number of threads and this new number will depend on how much time the thread spends waiting.