Search code examples
javablockingqueueconsumertake

java blockingqueue consumer block on full queue


I'm writing an small program to put tweets from the Twitter public stream into an HBase database. The program uses two threads, one to collect the tweets and one to process them. The first thread uses twitter4j StatusListener to get the tweets and puts them in an ArrayBlockingQueue with an capacity of 100. The second thread takes an status from the queue, filters the needed data and moves them to the database. The processing takes more time, than the collecting of the status.

The producer looks like this:

public void onStatus(Status status) {
    try {
        this.queue.put(status);
    } catch(Exception ex) {
        ex.printStackTrace();
    }
}

The consumer uses take and calls an function to process the new status:

public void run() {
    try {
        while(true) {
            // Get new status to process
            this.status = this.queue.take();
            this.analyse();
        }
    } catch(Exception ex) {
        ex.printStackTrace();
    }
 }

In the main function the two threads were created and started:

ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100);

Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public));
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public));

ta_public.start();
st_public.start();

The program runs for awhile without any problem, but then stops suddenly. At this time the queue is full and it seems, that the consumer is not able to take a new status from it. I tried several variations of the producer/consumer pattern without success. No exception is thrown.

I don't know were to look for the failure. I hope someone could give me an hint or an solution.


Solution

  • If working with blocking queues double check for blocking commands (put and take for ArrayBlockingQueue) in the code and typos if working with multiple lists.