Search code examples
javatwitter4jexecutorservice

Executor Service does not timeout and continues to run


I have the following code:

  def getIndustryData(String[] theIndustries) {

            PrintWriter printWriter = new PrintWriter(new BufferedWriter(new FileWriter("result.txt")))

            //Listens to Twitter statuses and carries out the following methods on the status
            StatusListener listener = new StatusListener() {
                @Override
                void onStatus(Status status) {

                    printWriter.write(status.getLang() + "|||" + status.getText())
                    printWriter.println()
                }

                @Override
                void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {

                }

                @Override
                void onTrackLimitationNotice(int numberOfLimitedStatuses) {

                }

                @Override
                void onScrubGeo(long userId, long upToStatusId) {

                }

                @Override
                void onStallWarning(StallWarning warning) {

                }

                @Override
                void onException(Exception ex) {

                }
            }

            TwitterStream stream = new TwitterStreamFactory().getInstance()
            stream.addListener(listener)
            FilterQuery fq = new FilterQuery()
            fq.track(theIndustries)
            ExecutorService executor = Executors.newSingleThreadExecutor()

            Future<String> future = executor.submit(new Callable<String>() {
                @Override
                String call() throws Exception {

                    stream.filter(fq)
                    return null
                }
            })

            try {

                future.get(2, TimeUnit.MINUTES)

            } catch (TimeoutException e) {

                stream.removeListener(listener)
                stream.shutdown()
                future.cancel(true)
                executor.shutdownNow()

            }
        }

I am using Twitter4J to access the Twitter API.I want to write tweets to a file for 2 minutes and then stop.

The stream.filter(fq) method runs even after the stated 2 minutes and the TimeoutException is never reached. I thought after the 2 minutes the exception would be caught and I could end the method however this does not happen.


Solution

  • You have wrong mental model of how TwitterStream works and also most probably of how standard java Future and ExecutorService work.

    TwitterStream doesn't work on any Thread you provide to it. TwitterStream .filter starts new thread internally as you can see at the source at https://github.com/yusuke/twitter4j/blob/master/twitter4j-stream/src/main/java/twitter4j/TwitterStreamImpl.java#L317 and https://github.com/yusuke/twitter4j/blob/master/twitter4j-stream/src/main/java/twitter4j/TwitterStreamImpl.java#L516

    Also Future.get with timeout method is not guaranteed to fail with TimeoutException. If the job is fast, it just returns value. And this is exactly your case! stream.filter(fq) creates new Thread which is fast and then your future immediately returns null.

    The simplest (but probably not the best) way to make it work is something like this

    stream.filter(fq)
    try
    {
        Thread.sleep(2 * 60 * 1000); // just sleep on the caller thread
    }
    catch (InterruptedException e)
    {
        // ignore
    }
    stream.removeListener(listener)
    stream.cleanup()
    //stream.shutdown()     //don't think you really need shutdown, cleanup seems to be enough