i'm trying to save save tweets from twitter with help twitter-streaming.But i have one problem: my program stops working after some period of time(depends on Batch Interval for 1 millis near 4-5 sec). So, could you help me with this problem solving). Tell me please what is wrong?
When batch interval near 100 millis i see some records like
19/08/06 23:45:26 INFO BlockRDD: Removing RDD 103 from persistence list
19/08/06 23:45:26 INFO BlockManager: Removing RDD 103
19/08/06 23:45:26 INFO TwitterInputDStream: Removing blocks of RDD BlockRDD[103] at createStream at Twitter.java:35 of time 1565124324340 ms
19/08/06 23:45:26 INFO ReceivedBlockTracker: Deleting batches: 1565124324320 ms
19/08/06 23:45:26 INFO InputInfoTracker: remove old batch metadata: 1565124324320 ms
-------------------------------------------
Time: 1565124325500 ms
When batch interval is "big" and any data isn't avaible, i just see message abou Spark UI starting and finished.
package TwitterAnalysis;
import org.apache.spark.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.twitter.*;
import twitter4j.Status;
public class Twitter {
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterOAuthKey.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterOAuthKey.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterOAuthKey.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterOAuthKey.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[*]")
.setAppName("SparkTwitter");
// JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());
enTweetsDStream.print();
jssc.start();
}
}
According to this answer:Spark 2.0.0 twitter streaming driver is no longer available is no available twitter-streaming-driver at spark 2.0 and higher version.Solution choose earlier version of Spark)