I am trying to process some messages that I get from an MQ infrastructure.I have two blocking queues,sharedQueue
and pubQueue
. The sharedqueue
gets filled up with the messages that I get from the MQ infrastructure as below.It will put the messages to sharedQueue
.
client.setCallback(new CallBack("inst", sharedQueue));
The messagemanipulator thread will read from the sharedQueue
, process it and put the response to pubQueue
for later publishing.
new MessageManipulatorThread(sharedQueue,pubQueue).run();
The publisher thread will take messages from pubQueue
and publish it to the MQ infrastructure.
new PublisherThread(pubQueue).run();
Below is the full code :
public class ArrayBlockingQueueExample {
private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);
public static void main(String[] args) throws MqttException, Exception {
new ArrayBlockingQueueExample().startThreads();
}
public void startThreads() throws MqttException, Exception{
MqttClient client = new MQTTClientFactory().getInstance();
client.setCallback(new CallBack("inst", sharedQueue));
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
}
public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){
this.sharedQueue = sharedQueue;
this.pubQueue = pubQueue;
}
public void run() {
while (true) {
try {
String msg = sharedQueue.take();
System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
pubQueue.put(msg);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PublisherThread implements Runnable {
private BlockingQueue<String> sharedQueue;
public PublisherThread(BlockingQueue<String> sharedQueue){
this.sharedQueue = sharedQueue;
}
public void run() {
while (true) {
System.out.println("Running pub");
try {
System.out.println("pub=>"+sharedQueue.take() );
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
The problem is new PublisherThread(pubQueue).run();
never works.I am guessing this is a thread synchronization issue.The pubQueue
is supposed to wait till it has any data filled up by the MessageManipulatorThread
, but it doesnt look like that.The PublisherThread
is waiting upon the pubQueue
to be free, but it never becomes free ! , is there anything else I should do for this ? Any help is much appreciated.
You're using Runnable.run()
instead of Thread.start()
, so this:
new MessageManipulatorThread(sharedQueue,pubQueue).run();
new PublisherThread(pubQueue).run();
Won't work. That's because run()
actually runs the method of the runnable inside the current thread instead of creating a new thread and executing it separately.
Instead, do this:
new Thread(new MessageManipulatorThread(sharedQueue,pubQueue), "MessageManipulatorThread").start();
new Thread(new PublisherThread(pubQueue), "PublisherThread").start();
Edit:
fge made the following comment in the question:
Why don't you use an
ExecutorService
instead of doing its job by hand?
To clarify what he means, he means using an ExecutorService
to process the messages for the pubQueue
instead of creating a thread to pull the messages and processing them manually. That code would look like this:
ExecutorService executor = Executors.newSingleThreadExecutor();
new Thread(new MessageManipulatorThread(sharedQueue, executor), "MessageManipulatorThread").start();
Then the MessageManipulatorThread
class would change to:
public class MessageManipulatorThread implements Runnable {
private BlockingQueue<String> sharedQueue;
private ExecutorService executor;
public MessageManipulatorThread(BlockingQueue<String> sharedQueue, ExecutorService executor){
this.sharedQueue = sharedQueue;
this.executor = executor;
}
public void run() {
while (true) {
try {
String msg = sharedQueue.take();
System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
executor.execute(new PublisherThread(msg));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
You'd then change PublisherThread
so that it uses processes only the single message that gets passed to it. This is another approach to what you're trying to do.
This approach also allows some flexibility. Using the other approach, PublisherThread
can only process one message at a time (synchronous). Using the ExecutorService
interface allows you to change the implementation, which can allow it to process more than one message at a time (asynchronous) simply by changing this:
ExecutorService executor = Executors.newSingleThreadExecutor();
To this:
ExecutorService executor = Executors.newFixedThreadPool(10);
That statement allows the executor to start up to 10 threads, which means up to 10 messages can be processed at once. See the Executors
class for more ways of creating ExecutorService
implementations.