Search code examples
javamultithreadingniothreadpoolexecutor

What sort of Java Thread pool can I use for processing data from sockets?


I have the following code for starting a threaded server:

Thread server = new Thread(new ServerRunnable(serverPort, devMode, messageQueue, database));
server.start();

Thread worker1 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        1
    ));
Thread worker2 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        2
    ));
Thread worker3 = new Thread(
    new WorkerRunnable(
        messageQueue,
        database, 
        devMode, 
        3
    ));

worker1.start();
worker2.start();
worker3.start();

The ServerRunnable passes a State object containing bytes read and other information into the messageQueue. The WorkerRunnable threads take messages and process them.

I was looking at ThreadPoolExecutor, hoping I could use it to replace the three worker threads above with a pool that could grow or shrink as required, but it doesn't work the way I expected. It wants a queue of tasks to complete.

My code is using Java NIO, so there is no guarantee that an item placed into the queue is complete, and so may require further processing. As a result I can't use ThreadPoolExecutor in the way I first imagined, which would be passing the Runnable to containing the queue to it.

So, it strikes me that if I want to use ThreadPoolExecutor here I have to add it in the ServerRunnable class (kind of invert my current process), and pass a new WorkerRunnable class onto the ThreadPoolExecutor queue after passing a message parameter to it. Is that correct?

Something like this maybe:

LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<Runnable>();

Thread server = new Thread(
    new ServerRunnable(
        serverPort, 
        devMode, 
        messageQueue, 
        database
    ));
server.start();
int  corePoolSize  =    5;
int  maxPoolSize   =   10;
long keepAliveTime = 5000;

ExecutorService threadPoolExecutor = 
    new ThreadPoolExecutor(
        corePoolSize,
        maxPoolSize,
        keepAliveTime,
        TimeUnit.MILLISECONDS,
        messageQueue); 
threadPoolExecutor.prestartAllCoreThreads()

Then in the ServerRunnable class:

// Process incoming bytes into a message

messageQueue.put(new WorkerRunnable(database, devMode, message));

Is my understanding correct?


Solution

  • Yes, either that or you can have single consumer to your queue which will keep passing the task to ExecutorService. Something like this

    Thread consumer = new Thread(new Consumer(queue));
    consumer.start();
    
    class Consumer implements Runnable {
        private ExecutorService     service = Executors.newCachedThreadPool();
        private final BlockingQueue queue;
    
        public Consumer(Queue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            Task t = null;
            while(t = queue.take()) {
                Worker worker = new Worker(t);
                service.execute(worker);
            }
        }
    }
    

    This is a bit loosely coupled with ExecutorService.