Search code examples
javamultithreadingjava.util.concurrentmqtt

Thread synchronization when using BlockingQueue


I am trying to process some messages that I get from an MQ infrastructure.I have two blocking queues,sharedQueue and pubQueue. The sharedqueue gets filled up with the messages that I get from the MQ infrastructure as below.It will put the messages to sharedQueue.

client.setCallback(new CallBack("inst", sharedQueue));

The messagemanipulator thread will read from the sharedQueue , process it and put the response to pubQueue for later publishing.

new MessageManipulatorThread(sharedQueue,pubQueue).run();

The publisher thread will take messages from pubQueue and publish it to the MQ infrastructure.

new PublisherThread(pubQueue).run();

Below is the full code :

 public class ArrayBlockingQueueExample {

 private BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<>(64);
 private BlockingQueue<String> pubQueue = new ArrayBlockingQueue<>(64);


public static void main(String[] args) throws MqttException, Exception {

    new ArrayBlockingQueueExample().startThreads();

}

public void startThreads() throws MqttException, Exception{

    MqttClient client =  new MQTTClientFactory().getInstance();
    client.setCallback(new CallBack("inst", sharedQueue));

    new MessageManipulatorThread(sharedQueue,pubQueue).run();
    new PublisherThread(pubQueue).run();


}



 public MessageManipulatorThread( BlockingQueue<String> sharedQueue , BlockingQueue<String> pubQueue){

    this.sharedQueue = sharedQueue;
    this.pubQueue = pubQueue;

}

    public void run() {
        while (true) {
            try {
                String msg = sharedQueue.take();
                System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
                pubQueue.put(msg);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


 }

public class PublisherThread implements Runnable {

private BlockingQueue<String> sharedQueue;

public PublisherThread(BlockingQueue<String> sharedQueue){

    this.sharedQueue = sharedQueue;

}

     public void run() {
         while (true) {
             System.out.println("Running pub");
             try {
                 System.out.println("pub=>"+sharedQueue.take() );
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
     }

  }

The problem is new PublisherThread(pubQueue).run(); never works.I am guessing this is a thread synchronization issue.The pubQueue is supposed to wait till it has any data filled up by the MessageManipulatorThread, but it doesnt look like that.The PublisherThread is waiting upon the pubQueue to be free, but it never becomes free ! , is there anything else I should do for this ? Any help is much appreciated.


Solution

  • You're using Runnable.run() instead of Thread.start(), so this:

    new MessageManipulatorThread(sharedQueue,pubQueue).run();
    new PublisherThread(pubQueue).run();
    

    Won't work. That's because run() actually runs the method of the runnable inside the current thread instead of creating a new thread and executing it separately.

    Instead, do this:

    new Thread(new MessageManipulatorThread(sharedQueue,pubQueue), "MessageManipulatorThread").start();
    new Thread(new PublisherThread(pubQueue), "PublisherThread").start();
    

    Edit:

    fge made the following comment in the question:

    Why don't you use an ExecutorService instead of doing its job by hand?

    To clarify what he means, he means using an ExecutorService to process the messages for the pubQueue instead of creating a thread to pull the messages and processing them manually. That code would look like this:

    ExecutorService executor = Executors.newSingleThreadExecutor();
    new Thread(new MessageManipulatorThread(sharedQueue, executor), "MessageManipulatorThread").start();
    

    Then the MessageManipulatorThread class would change to:

    public class MessageManipulatorThread implements Runnable {
    
        private BlockingQueue<String> sharedQueue;
        private ExecutorService executor;
    
        public MessageManipulatorThread(BlockingQueue<String> sharedQueue, ExecutorService executor){
            this.sharedQueue = sharedQueue;
            this.executor = executor;
        }
    
        public void run() {
            while (true) {
                try {
                    String msg = sharedQueue.take();
                    System.out.println(Thread.currentThread().getName() + "manipulator runnning => "+msg);
                    executor.execute(new PublisherThread(msg));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    You'd then change PublisherThread so that it uses processes only the single message that gets passed to it. This is another approach to what you're trying to do.

    This approach also allows some flexibility. Using the other approach, PublisherThread can only process one message at a time (synchronous). Using the ExecutorService interface allows you to change the implementation, which can allow it to process more than one message at a time (asynchronous) simply by changing this:

    ExecutorService executor = Executors.newSingleThreadExecutor();
    

    To this:

    ExecutorService executor = Executors.newFixedThreadPool(10);
    

    That statement allows the executor to start up to 10 threads, which means up to 10 messages can be processed at once. See the Executors class for more ways of creating ExecutorService implementations.