Search code examples
scalaapache-sparkreal-timespark-streaming

Spark Streaming RDD with a list of previous values


I'm not really sure how to do this with Spark Streaming, it is really giving me a headache. I have an input stream of some users, for example User1, User2, etc with some parameters, which are not important for now. Then I do some processing on those parameters and get a current state of the user, which can be, for example A, B, C, like this:

   (User1, A)
   (User2, B)
   (User3, B)

In some other point in the near future I get a new set of events, which might be like this:

   (User1, C)
   (User2, B)
   (User3, A)

Until now it is no problem. The tricky part is how to generate a RDD which could have a structure like this, for all of the events in the last X seconds:

   (User1, (A, C))
   (User2, (B, B))
   (User3, (B, A))

If it is not obvious, I would like to generate a list of the previous states the user was in in the last X seconds. Is there a way of achieving this with Spark Streaming? Until now I would usually do this with Esper, but Spark Streaming still looks something useful for the job.

I'm not limiting myself to a tuple in the examples above, the parenthesis are only for explaining the process.

Edit:

Here is a code example I have so far.

val userZone = positions.map(position =>
      if (
        position.latitude > 45.445 &&
          position.latitude < 45.48 &&
          position.longitude > 9.14 &&
          position.longitude < 9.24)
        (position, "center")
      else (position, "out")

    val groupedByZones = userZone.groupByKeyAndWindow(Seconds(120))
    groupedByZones.foreachRDD(rdd =>
    {
      rdd.foreach(record => {
        println(record._1.userid + ":" + record._2)
      })
    })

So my desired output should be something like:

U1: (center, out, center)
U2 (center,center,out)...

But for now I'm only getting separate values like:

U1 : center
U1 : out
U1 : center

I hope I'm being clear.


Solution

  • So there are a few ways to do this in Spark Streaming. The simplest that comes to mind is groupByKeyAndWindow (which you can see in http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions ).