Search code examples
scalaapache-sparkcassandrarddspark-cassandra-connector

filter rdd based on timestamp


I have a following code :-

val imei = "86656"
val date = "2017-04-09"
val gpsdt = "2017-04-09 00:20:10"
val rdd = sc.cassandraTable("test", "xyz").select("id", "date", "dttime").where("id=? and date=?", imei, date)

So, now I have rdd which brings the entire data of particular imei of specific date but I want to filter rows to get 2 rows based on the mentioned "gpsdt" - 1 row just greater then the given time and 2nd Row just less then the mentioned row? How Can I achieve that ?

My Cassandra DB Schema is :-

create table xyz( id text,date text, dttime timestamp,roll text, primary key((id,date),dttime)

Thanks,


Solution

  • you can separate the rdd into two :

    1 with dttime greater than gpsdt, order by dttime in ascending order and take the first one.

    2 with dttime less than gpsdt, order by dttime in descending order and take the first one.

    And finally union them and you should have your desired rows

    programmatically

    val justGreater = rdd.filter(row => row.get[String]("dttime") > gpsdt).sortBy(row => row.get[String]("dttime")).take(1)
    val justLess = rdd.filter(row => row.get[String]("dttime") < gpsdt).sortBy(row => row.get[String]("dttime"), false).take(1)
    justGreater.union(justLess)
    

    Thanks to @Alex Ott for pointing out to https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md