I want to replace the string "a" for an array of Strings making .contains() to check for every String in the array. Is that possible?
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))
Edit:
Also tried this (sc is sparkContext):
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))
And got the following error:
java.io.NotSerializableException: Object of org.apache.spark.streaming.twitter.TwitterInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
Then I tried to broadcast the array before it is used:
val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))
And got the same error.
Thanks
As I understand the question you want to see if the status text after being split contains a list of words which is a subset of a
:
val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))