Search code examples
multithreadingterminateblockingqueue

Thread consuming queue, Terminate


I would like to know if the following method is correct or not. I've producer and consumer thread that work on a common BlockingQueue. The producer is a sniffer thread so it will stop automatically,but fot the consumer i think to terminate with a loop on status (alive/dead) of producer thread. Any suggestions? Thanks

-)From the Main thread:

    ArrayBlockingQueue<PcapPacket> queue = new ArrayBlockingQueue<>(); 
    Producer p = new Producer(queue);
    Thread t1 =new Thread(p);
    t1.start();
    Consumer c = new Consumer(queue,t1);
    new Thread(c).start();

-)Producer

public void run() {
         public void nextPacket(PcapPacket packet, String user) {
            try {
                queue.put(packet); 
            } catch (InterruptedException ex) {

            }

-) Consumer

public void run() {
   while(producer.isAlive()){
        try {
        //Thread.sleep(50);
        packet=queue.take(); 

Solution

  • Polling producer's status is sub-optimal.

    Preferred approach is to make producer, during producer exit, put some 'poison pill' into queue, and for consumer to end it's loop as soon as it have received that pill:

    class Producer implements  Runnable {
    
        static final Object TIME_TO_STOP = new Object();
    
        private final BlockingQueue<Object> q;
    
        Producer(BlockingQueue<Object> q) {
            this.q = q;
        }
    
    
        @Override
        public void run() {
            try {
                while (true) {
                    q.put(readNextPacket());
                }
            } finally {
                // exception happened
                try {
                    q.put(TIME_TO_STOP);
                } catch (InterruptedException e) {
                    // somehow log failure to stop properly
                }
            }
        }
    }
    
    class Consumer implements Runnable {
    
        private final BlockingQueue<Object> q;
    
        Consumer(BlockingQueue<Object> q) {
            this.q = q;
        }
    
        @Override
        public void run() {
            while (true) {
                Object packet = q.take();
                if (packet == Producer.TIME_TO_STOP) {
                    break;
                }
                // process packet
            }
        }
    }