Search code examples
scalaapache-sparktwitterspark-streaming

Testing Twitter with Spark Streaming API


I am new to Streaming framework of Spark and was trying to process the twitter stream. I am in process of writing test cases for same and understand that I can use Spark StreamingSuiteBase which will help me test input as a stream on my functions. But I have written a function which take DStream[Status] as input and after processing gives DStream[String] as output. The api I am using from StreamingSuiteBase is testOperation.

test("Filter only words Starting with #")  {
  val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
  val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))

  testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)

And this is the function on which the input is sent..

 def getText(englishTweets: DStream[Status]): DStream[String] = {
    println(englishTweets.toString)

    val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))

    hashTags
  }

But I am getting the error "type mismatch" due to DStream[Status] and DStream[String]. How do I mock Stream[Status].


Solution

  • So, I resolved this issue by getting the Twitter status from "createStatus" API of TwitterObjectFactory. There was no need to mock TwitterStatus. Even if you manage to mock it there are Serialization issues. So, this is the best solution:

    val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
    val tweetStatus = TwitterObjectFactory.createStatus(rawJson)
    

    Hope this helps someone !