Search code examples
scalastreamapache-sparktwitter4jtweets

createstream of tweets in Spark


I'm trying to create a stream of tweets in Spark using Scala and Twitter4j. This is a snippet of my code:

object auth{
      val config = new twitter4j.conf.ConfigurationBuilder()
        .setOAuthConsumerKey("")
        .setOAuthConsumerSecret("")
        .setOAuthAccessToken("")
        .setOAuthAccessTokenSecret("")
        .build
            }
    val conf = new SparkConf().setMaster("local[2]").setAppName("Tutorial")  
    val ssc = new StreamingContext(conf, Seconds(1))

    val twitter_auth = new TwitterFactory(auth.config)
    val a = new twitter4j.auth.OAuthAuthorization(auth.config)
    val atwitter =  twitter_auth.getInstance(a).getAuthorization()

and when I try to call createstream:

val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)

I get this error:

[error] /home/shaza90/Desktop/streaming/scala/Tutorial.scala:30: overloaded method value createStream with alternatives:
[error]   (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization,filters: Array[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
[error]   (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
[error]  cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.Authorization, Seq[String], org.apache.spark.storage.StorageLevel)
[error]     val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
[error]                               ^
[error] one error found
[error] (compile:compile) Compilation failed

I don't know why it isn't matching the types and seeing my call as an overload, can you assist please? And when I try to replace atwitter (the authorization object) with None it compiles successfully !!


Solution

  • I think atwitter must be Option[T] to disambiguate the call. You can use:

    val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
    

    instead of

    val tweets = TwitterUtils.createStream(ssc, atwitter, filters, DISK_ONLY_2)
    

    You can use: Some(atwitter) in the call too... as said.

    Here you have the test class for this api: https://github.com/apache/spark/blob/master/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala