I am new to Streaming framework of Spark and was trying to process the twitter stream. I am in process of writing test cases for same and understand that I can use Spark StreamingSuiteBase which will help me test input as a stream on my functions. But I have written a function which take DStream[Status] as input and after processing gives DStream[String] as output. The api I am using from StreamingSuiteBase is testOperation.
test("Filter only words Starting with #") {
val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))
testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)
And this is the function on which the input is sent..
def getText(englishTweets: DStream[Status]): DStream[String] = {
println(englishTweets.toString)
val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))
hashTags
}
But I am getting the error "type mismatch" due to DStream[Status] and DStream[String]. How do I mock Stream[Status].
So, I resolved this issue by getting the Twitter status from "createStatus
" API of TwitterObjectFactory
. There was no need to mock TwitterStatus
. Even if you manage to mock it there are Serialization issues. So, this is the best solution:
val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
val tweetStatus = TwitterObjectFactory.createStatus(rawJson)
Hope this helps someone !