Search code examples
apache-sparkcassandrarddspark-cassandra-connector

Parse Spark RDD after Cassandra join


I want to read a parquet file and join with some of the columns in Cassandra. I was able to join and get an RDD, but I am unable to parse the resulting RDD. Here are further details

case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)

val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))


val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")

scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19

The resulting RDD schema is shown above. The output of the RDD is something like this.

(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})

(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})

I am not sure how to parse this RDD. I want to sum up the last column from IP and the value1 column from the Cassandra row.

Let me know if any further details are needed. And appreciate the help

Here is the Cassandra table schema

CREATE TABLE aa.tbl (
key text,
key2 text,
key3 text,
key4 text,
key5 text,
key6 text,
key7 text,
key8 text,
key9 text,
key10 text,
key11 text,
key12 text,
key13 text,
key14 text,
key15 text,
column1 text,
column2 text,
column3 text,
column4 text,
column5 text,
value1 text,
value2 blob,
value3 text,
PRIMARY KEY ((key, key2, key3, key4, key5, key6, key7, key8, key9, key10, key11, key12, key13, key14, key15), column1, column2, column3, column4, column5)
) WITH CLUSTERING ORDER BY (column1 ASC, column2 ASC, column3 ASC, column4 ASC, column5 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';

Solution

  • You need to do something like this (didn't check the code, but it should work with small adjustments - I assumed that value1 has integer type in the Cassandra):

    joinWithRDD.map { case (ip, row) => 
       val newVal = ip.value1.toInteger + row.getInt("value1")
       IP(ip.key, key2, .... newVal.toString)
    }
    

    the joinWithCassandraTable returns back a tuple of the your data as _1, and data found in Cassandra as _2. When accessing Cassandra data you can use getter functions getInt, getString, etc., or you can map Row into case class, as described in documentation for Spark Cassandra Connector.