I have following Cassandra table:
CREATE TABLE listener.snapshots_geohash
(
created_date text, -- date when record have come to the system
geo_part text, -- few signs of geo hash - just for partitioning
when timestamp, -- record creation date
device_id text, -- id of device produced json data (see snapshot column)
snapshot text, -- json data, should be aggregated by spark
PRIMARY KEY ((created_date, geo_part), when, device_id)
)
Every morning aggregating application should load data for previous day and aggregate JSON data from snapshot column. Aggregation will group data by geohash, that's why its part were selected to be part of partition key.
I know that it is efficient to load data from Cassandra by using joinWithCassandraTable - but for that I have to get RDD constructed from (created_date, geo_part) pairs. While I know created_date value, I can't list geo_part values - since it is just part of geohash and its values are not continuous. So I have somehow to run select distinct created_date, geo_part from ks.snapshots
and create RDD from its results. The question is how to run this select with spark 2.0.2 and cassandra-connector 2.0.0-M3 or perhaps there alternative way?
I found the way to fetch Cassandra partition keys by running CQL query with CassandraConnector:
val cassandraConnector = CassandraConnector(spark.sparkContext.getConf)
val distinctRows = cassandraConnector.withSessionDo(session => {
session.execute(s"select distinct created_date, geo_part from ${keyspace}.$snapshots_table")
}).all().map(row => {TableKeyM(row.getString("created_date"), row.getString("geo_part"))}).filter(k => {days.contains(k.created_date)})
val data_x = spark.sparkContext.parallelize(distinctRows)
The table structure design has following problem: Cassandra disallows to add WHERE created_date='...' clause to select distinct created_date, geo_part and it is required to fetch whole list of pairs and filter it in application.
Alternative solution could be making partition keys continuous. In case if aggregation would be done by hours - then partition key could be (created_date, hour) and 24 hours could be listed in application. If 24 partitions per day is not enough, and aggregation have group by by geohash, it is possible to stick with geohash significant part - but it should be translated to something countable - for example geoPart.hash() % desiredNumberOfSubpartitions