Search code examples
javamultithreadingproducer-consumer

Multiple producers to a queue not working


This is an exercise from Java 7 Concurrency cookbook. Goal is to have multiple producers write Events to a shared queue and a cleaner task (daemon thread) will delete the Events older than 5 seconds from the queue.(Find code below) If I try writing test for this or use it in a main() program, it just inserts 3 items in the queue and exits. My question: if there is a for-loop in WriterTask, why doesn't the program run the course of for-loop (insert ~30 items in queue) and exit before that?

import java.util.Date;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
class Event{
    Date var;
    public Event(Date v){
        var = v;
    }
}
class WriterTask implements Runnable{
    Deque<Event> queue;
    int from = 0,to = 0;
    public WriterTask(Deque<Event> queue, int from, int to){
        this.queue = queue;
        this.from = from;
        this.to = to;
    }
    @Override
    public void run(){
        for(int i=from;i<to;i++){
            Event ev = new Event(new Date());
            System.out.println("Generating event number: "+i);
            queue.addFirst(ev);
            try{
                System.out.println("SLeeping now:"+Thread.currentThread().getName());
                TimeUnit.SECONDS.sleep(1);
            }catch(InterruptedException ie){
                System.out.println("Interrupted the thread!");
            }
        }
    }
}


class CleanerTask implements Runnable{
    Deque<Event> queue;

    public CleanerTask(Deque<Event> queue){
        this.queue = queue;
    }
    @Override
    public void run(){
        while(true){
            Date dt = new Date();
            clean(dt);
        }
    }

    public void clean(Date date){
//        System.out.println("Cleaning for date, current size of queue:"+queue.size());
        long difference = 0;
        boolean anythingDeleted = false;
        do{
            if(queue.isEmpty()){return;}
            Event first = queue.getLast();
            difference = date.getTime() - first.var.getTime();
            if(difference >  5000){
                queue.removeLast();
                anythingDeleted = true;
            }
        }while(difference>5000);
        if(anythingDeleted){
            System.out.println("Queue cleaned up, new size:"+queue.size());
        }
    }
}

Test:

import org.junit.Test;
import org.junit.rules.Timeout;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.LinkedList;
public class WriterCleanerTest {
    @Test
    public void consumeCleanTest(){
        Deque<Event> queue = new ArrayDeque<Event>();
        WriterTask wt = new WriterTask(queue, 0, 10);
        Thread wt1 = new Thread(wt);
        Thread wt2 = new Thread(new WriterTask(queue, 60, 70));
        Thread wt3 = new Thread(new WriterTask(queue, 100, 110));

        wt1.start();
        wt2.start();
        wt3.start();

        System.out.println("Size of queue:"+queue.size());

        Thread cleanerTask = new Thread(new CleanerTask(queue));
        cleanerTask.setDaemon(true);
        cleanerTask.start();
    }
}

Output:

Size of queue:0
Final Size of queue:0
Generating event number: 0
Generating event number: 60
SLeeping now:Thread-1
Generating event number: 100
SLeeping now:Thread-2
SLeeping now:Thread-0

Process finished with exit code 0

Solution

  • You are not waiting for your threads to complete.

    Add at the end of your code:

    cleanerTask.start();
    // Wait for everything to finish.
    wt1.join();
    wt2.join();
    wt3.join();
    cleanerTask.join();
    

    Essentially your main thread must join all other running threads before it exits.