I'm not really sure how to do this with Spark Streaming, it is really giving me a headache. I have an input stream of some users, for example User1, User2, etc with some parameters, which are not important for now. Then I do some processing on those parameters and get a current state of the user, which can be, for example A, B, C, like this:
(User1, A)
(User2, B)
(User3, B)
In some other point in the near future I get a new set of events, which might be like this:
(User1, C)
(User2, B)
(User3, A)
Until now it is no problem. The tricky part is how to generate a RDD which could have a structure like this, for all of the events in the last X seconds:
(User1, (A, C))
(User2, (B, B))
(User3, (B, A))
If it is not obvious, I would like to generate a list of the previous states the user was in in the last X seconds. Is there a way of achieving this with Spark Streaming? Until now I would usually do this with Esper, but Spark Streaming still looks something useful for the job.
I'm not limiting myself to a tuple in the examples above, the parenthesis are only for explaining the process.
Edit:
Here is a code example I have so far.
val userZone = positions.map(position =>
if (
position.latitude > 45.445 &&
position.latitude < 45.48 &&
position.longitude > 9.14 &&
position.longitude < 9.24)
(position, "center")
else (position, "out")
val groupedByZones = userZone.groupByKeyAndWindow(Seconds(120))
groupedByZones.foreachRDD(rdd =>
{
rdd.foreach(record => {
println(record._1.userid + ":" + record._2)
})
})
So my desired output should be something like:
U1: (center, out, center)
U2 (center,center,out)...
But for now I'm only getting separate values like:
U1 : center
U1 : out
U1 : center
I hope I'm being clear.
So there are a few ways to do this in Spark Streaming. The simplest that comes to mind is groupByKeyAndWindow (which you can see in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions ).