Search code examples
scalaapache-sparktwittertwitter-streaming-api

Search for specific key word using twitter api and spark


I was trying this code and I replaced # with #Apple.

val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
val tweets = TwitterUtils.createStream(ssc, None)
val statuses = tweets.map(status => status.getText())
val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
val hashtags = tweetwords.filter(word => word.startsWith("#"))
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(1000), Seconds(1))
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
sortedResults.print

But I didn't get any results.

Does this streaming have some limit to how many tweets and from which region it will fetch the tweets? Also I tried looking for #OPPO as in my twitter account this was trending so I tried looking for it but still I didn't get any results.


Solution

  • val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
    //The keyword you want to look for can be specified in a sequence as follows
    var seq:Seq[String] = Seq("#Rajasthan","#Apple")
    val tweets = TwitterUtils.createStream(ssc, None, seq)
    val statuses = tweets.map(status => status.getText())
    val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
    val hashtags = tweetwords.filter(word=>word.contains("#"))
    val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
    val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(1000), Seconds(1))
    val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
    sortedResults.print