I am newbie for the Cassandra and I want to implement SCD Type-1 in Cassandra DB.
This SCD Type1 job will be executed from the Spark.
The data will be stored as time series partitioned data. viz: Year/month/Day
Example: I have records for the last 300 days and my new records may have the new records as well as the updated records. I want to compare the updated records for the last 100 days and if the records are new then it should perform the insert operation else update.
I am not getting any clue to perform this operation hence not sharing any CQL :(
Sample table structure is:
CREATE TABLE crossfit_gyms_by_city_New (
country_code text,
state_province text,
city text,
gym_name text,
PRIMARY KEY ((country_code, state_province), gym_name)
) WITH CLUSTERING ORDER BY (gym_name ASC );
My Sample Spark Code:
object SparkUpdateCassandra {
System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
def main(args: Array[String]): Unit = {
val spark = org.apache.spark.sql.SparkSession
.builder()
.master("local[*]")
.config("spark.cassandra.connection.host", "localhost")
.appName("Spark Cassandra Connector Example")
.getOrCreate()
import spark.implicits._
//Read Cassandra data using DataFrame
val FirstDF = Seq(("India", "WB", "Kolkata", "Cult Fit"),("India", "KA", "Bengaluru", "Cult Fit")).toDF("country_code", "state_province","city","gym_name")
FirstDF.show(10)
FirstDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf1 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf1.show(10)
// spark.implicits.wait(5000)
val SecondDF = Seq(("India", "WB", "Siliguri", "CultFit"),("India", "KA", "Bengaluru", "CultFit")).toDF("country_code", "state_province","city","gym_name")
SecondDF.show(10)
SecondDF.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.option("confirm.truncate", "true")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.option("keyspace", "emc_test")
.option("table", "crossfit_gyms_by_city_new")
.save()
val loaddf2 = spark.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "localhost")
.option("spark.cassandra.connection.port", "9042")
.options(Map( "table" -> "crossfit_gyms_by_city_new", "keyspace" -> "emc_test"))
.load()
loaddf2.show(10)
}
}
Note: I am using Scala for the Spark framework.
In Cassandra everything is upsert - if row doesn't exist it will be inserted, if it exists, then it will be updated, so you just need to get your data into RDD or DataFrame and use corresponding function of Spark Cassandra Connector:
rdd.saveToCassandra("keyspace", "table")
Or just write
inDataFrame API:
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "table_name", "keyspace" -> "keyspace_name"))
.mode(SaveMode.Append)
.save()