I have to create message sending application which send SMS to our customers. since generating and sending messages to customers takes considerable time, I have decided to implement it with producer and consumer pattern. So, this will not affect the original flow of execution.
Once I put raw data to the queue as a object, this will picked by one of consumer threads in thread pool and then generate message and send SMS. this flow should continue once application is up and running.
My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task. is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?
MessageSendUtil class is used to create common queue and initialize the taks.
public class SendMessageUtil {
public static void initializingSendMessage(RawData trxn) {
BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>();
ExecutorService produceMessagePool = Executors.newFixedThreadPool(1);
ExecutorService consumerMessagePool = Executors.newFixedThreadPool(5);
try {
produceMessagePool.submit(new Producer(sharedQueue));
int i = 0;
while (i++<5) {
consumerMessagePool.submit(new Consumer(sharedQueue));
}
produceMessagePool.shutdown();
consumerMessagePool.shutdown();
} catch (Exception ex){
System.out.println(ex.getMessage());
}
}
my consumer and producer class looks like this.
public class Producer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
public Producer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
try {
Message message = new Message();
message.setMessage("Test message sending");
sharedQueue.add(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
/
public class Consumer implements Runnable {
private final BlockingQueue<Message> sharedQueue;
private MessageBroadcaster messageBroadcaster;
public Consumer(BlockingQueue<Message> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
initializeMessageBroadcaster();
//Send messages to customer
while(true){
try {
Message message = sharedQueue.take();
messageBroadcaster.sendMessage(message);
} catch (Exception err) {
err.printStackTrace();
}
}
}
private void initializeMessageBroadcaster() {
if(Objects.isNull(messageBroadcaster)){
messageBroadcaster = new MessageBroadcasterImpl();
}
}
}
after several time of invoking initializingSendMessage(RawData trxn), live threads show like this.
My application works fine. but I found that each thread that consumer and producer thread pool creates stay alive in waiting state even after it finished the send SMS task.
That's correct. Your consumer thread task is inside of:
while (true) {
It will never stop. There are many ways to fix this. One way is to put constant "done" message on the queue. You could either put the same number of done messages on the queue as there are consumer or use something like this logic:
private static final Message DONE_MESSAGE = new Message();
...
// producer puts it into the queue when it is done
sharedQueue.add(DONE_MESSAGE);
...
// consumer takes it from the queue and quits
Message message = sharedQueue.take();
if (message == DONE_MESSAGE) {
// put it back on the queue to stop the other consumer threads
sharedQueue.put(DONE_MESSAGE);
// quit the consumer thread
return;
}
is it a problem for long running application or can I use the the consumer and producer thread pool for all the time instead of creating new thread pool each time when initializingSendMessage(RawData trxn) method invoked?
I would recommend keeping the same thread pool running for the life of the application. The whole point is to reuse threads and not have them created and shutdown all of the time. Also, there's no point in having 2 fixed thread pools. One with a size of 6 or even just used a cached thread pool and it will create a thread for each of the producer and consumer jobs.