In a JPA project, I have two threads: producer and consumer.
Producer should get the search result of twitter query from another class and put it in a LinkedBlockingQueue, thread consumer should consume the result and use another class to persist them to MYSQL.
Here I show you first the main class, producer and consumer :
Main Class:
public class RunQuery {
public final static EntityManagerFactory
emf=Persistence.createEntityManagerFactory("mypu");
public static void main(String[] args) throws Exception
{
org.apache.log4j.BasicConfigurator.configure(new
NullAppender());
BlockingQueue<Pair<String, QueryResult>> tweetBuffer=new
LinkedBlockingQueue<Pair<String,QueryResult>>();
// Creating Producer and Consumer Thread
Runnable producerThread = new
TwitterStreamProducer("producer",tweetBuffer,args);
Runnable consumerThread = new
TwitterStreamConsumer("consumer",tweetBuffer);
Thread producer=new Thread(producerThread);
Thread consumer=new Thread(consumerThread);
producer.start();
Thread.sleep(100);
consumer.start();
}
Producer thread:
public class TwitterStreamProducer implements Runnable{
private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
private final ResultsController resultsController;
private String[] Keywords;
private String name;
public TwitterStreamProducer(String name,
BlockingQueue<Pair<String, QueryResult>> tweetBuffer,
String[]
keywords) {
this.tweetBuffer=tweetBuffer;
this.resultsController=new ResultsController();
this.Keywords=keywords;
this.name=name;
}
public void run() {
for(String key:Keywords)
{
boolean interrupted = false;
try {
this.tweetBuffer.put( new Pair<String,QueryResult>
(key,resultsController.search(key)) );
Logger.getLogger("TwitterApp").info("one result added to the
queue, current size:"+tweetBuffer.size());
}
catch (InterruptedException e) {
interrupted = true;
// Logger.getLogger("Producer").info( this.name+ " Interrupted
"+e.toString() );
}
catch(Exception e)
{
interrupted = true;
}
finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
}
Consumer Thread:
public class TwitterStreamConsumer implements Runnable{
private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
private final ResultsController resultsController;
private String name=null;
public TwitterStreamConsumer(String name,
BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {
this.name=name;
this.tweetBuffer=tweetBuffer;
this.resultsController=new ResultsController();
}
public void run() {
while(! tweetBuffer.isEmpty())
{
try {
Logger.getLogger("TwitterApp").info(this.name+" has consummed
queue, current size of queue:"+tweetBuffer.size());
resultsController.parse(this.tweetBuffer.take());
} catch (InterruptedException e) {
Logger.getLogger("Consumer").info(this.name+ " interrupted
"+e.getMessage());
}
}
}
}
If other information is needed, I will provide here.
This is the Logger result, When I run it, the producer always produces 8 results and after that nothing more happens and the application will not be interrupted or does not produce any error:
Jul 12, 2015 12:07:22 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:1
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:2
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:3
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:4
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:5
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:6
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:7
Jul 12, 2015 12:07:26 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:8
When I debug it with the break point on the beginning of run method in consumer thread, then it work properly.
Thank you in advance for your suggestions.
Your consumer ends in the first run. When it executes for the first time, the buffer should be empty and then it doesn't enter in the while loop and ends the run method and the Thread.
You have to loop waiting for the buffer and, sometimes, if the buffer is empty this shouldn't end the consumer. It has just to wait for the next message. You could do this with a code like this one:
public class TwitterStreamConsumer implements Runnable{
private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
private final ResultsController resultsController;
private String name=null;
private boolean stopped;
public TwitterStreamConsumer(String name, BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {
this.name=name;
this.tweetBuffer=tweetBuffer;
this.resultsController=new ResultsController();
}
public void stop() {
stopped = true;
}
public void run() {
stopped = false;
while(! stopped) {
try {
resultsController.parse(this.tweetBuffer.take());
Logger.getLogger("TwitterApp").info(this.name+" has consummed queue, current size of queue:"+tweetBuffer.size());
}
catch (InterruptedException e) {
Logger.getLogger("Consumer").info(this.name+ " interrupted "+e.getMessage());
}
}
}
}
To stop the thread you have to do something like that:
consumer.stop();
consumerThread.interrupt();