Search code examples
javamultithreadingtwittertwitter4j

Multithreaded Twitter access through Twitter4J


I have written the following Java code:

twitterStream.addListener(new StreamListener());

FilterQuery filterQuery = new FilterQuery();
filterQuery.follow(filteringUsers);
filterQuery.track(filteringWords);

twitterStream.filter(filterQuery);

to track some users and keywords in Twitter (via the Streaming API). Here, StreamListener is my personal implementation of the listener.

I am tracking a lot of keywords, hashtags and users, and thus I accumulate in memory a lot of tweets waiting to be processed. In fact, I am just taking them through the listener (in the onStatus() method) and flushing them in the database.

Still, the fact that they have to wait in memory obviously saturates the memory in some hours. In 20 minutes run I accumulated in memory 177000 LinkedBlockingQueue$Node objects and 1.272MB of char[] (seen through profiling).

I would like to keep the pipeline running continuously, and obviously this is not possible in the current status.

Thus, I would like to know whether there is a way of adding multiple listeners in multithreading, so that they can concurrently empty the queue of tweets and speed up the processing.

  1. In case it is possible: do these listeners empty the queue concurrently? I mean: could it there be the case that they read the same tweet multiple times?
  2. In case it is not possible: how can I solve my problem?

Thanks in advance.


Solution

  • Although a direct multithreaded solution is not possible through Twitter4J, one could decide to simulate a multithreaded queue handling through the listener class.

    Suppose StreamListener is your specialization of the StatusListener Twitter4J listener.

    We replicate the queue inside StreamListener, as a private attribute:

    private LinkedBlockingQueue<String> tweets;
    

    The queue is initialized in the constructor:

    tweets = new LinkedBlockingQueue<String>();
    

    Moreover, in the constructor we build a thread pool meant to read tweets from the queue (in batches) and store them in the database:

        final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        Runnable tweetAnalyzer = defineMonitoringRunnable(tweetRepository);
        for (int i = 0; i < NUM_THREADS; i++) {
            executor.execute(tweetAnalyzer);
            try {
                Thread.sleep(THREADS_DELAY);
            } 
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    where the Runnable object can be built as follows:

    private Runnable defineMonitoringRunnable(final TweetRepository tweetRepository) {
        return new Runnable() {
    
            @Override
            public void run() {
                List<String> tempTweets = new ArrayList<String>();
    
                while (true) {
                    if (tweets.size() > 0) {
                        tempTweets.clear();
                        tweets.drainTo(tempTweets);
    
                        tweetRepository.insert(tempTweets);   
                    }
    
                    try {
                        Thread.sleep(TWEETS_SAVING_TIME);
                    } 
                    catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
    
                }
            }
        };
    }
    

    (TWEETS_SAVING_TIME is the waiting time of each Thread object between one tweet saving and another)

    Finally, the onStatus() method stores tweets in the queue once they arrive to the listener:

    @Override
    public void onStatus(Status status) {   
        tweets.add(TwitterObjectFactory.getRawJSON(status));
    }