I want to read a parquet file and join with some of the columns in Cassandra. I was able to join and get an RDD, but I am unable to parse the resulting RDD. Here are further details
case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)
val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))
val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")
scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19
The resulting RDD schema is shown above. The output of the RDD is something like this.
(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})
(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})
I am not sure how to parse this RDD. I want to sum up the last column from IP and the value1 column from the Cassandra row.
Let me know if any further details are needed. And appreciate the help
Here is the Cassandra table schema
CREATE TABLE aa.tbl (
key text,
key2 text,
key3 text,
key4 text,
key5 text,
key6 text,
key7 text,
key8 text,
key9 text,
key10 text,
key11 text,
key12 text,
key13 text,
key14 text,
key15 text,
column1 text,
column2 text,
column3 text,
column4 text,
column5 text,
value1 text,
value2 blob,
value3 text,
PRIMARY KEY ((key, key2, key3, key4, key5, key6, key7, key8, key9, key10, key11, key12, key13, key14, key15), column1, column2, column3, column4, column5)
) WITH CLUSTERING ORDER BY (column1 ASC, column2 ASC, column3 ASC, column4 ASC, column5 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';
You need to do something like this (didn't check the code, but it should work with small adjustments - I assumed that value1
has integer type in the Cassandra):
joinWithRDD.map { case (ip, row) =>
val newVal = ip.value1.toInteger + row.getInt("value1")
IP(ip.key, key2, .... newVal.toString)
}
the joinWithCassandraTable
returns back a tuple of the your data as _1
, and data found in Cassandra as _2
. When accessing Cassandra data you can use getter functions getInt
, getString
, etc., or you can map Row
into case class, as described in documentation for Spark Cassandra Connector.