Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

Update Map type columns in Cassandra with new key value pairs and not completely overwrite the map


Continuing the question at Insert Spark Dataset[(String, Map[String, String])] to Cassandra Table.

I have a Spark Dataset of type Dataset[(String, Map[String, String])].

I have to insert the same into a Cassandra table.

Here, key in the Dataset[(String, Map[String, String])] will become my primary key of the row in Cassandra.

The Map in the Dataset[(String, Map[String, String])] will go in the same row in a column ColumnNameValueMap.

My Cassandra table structure is:

CREATE TABLE SampleKeyspace.CassandraTable (
  RowKey text PRIMARY KEY,
  ColumnNameValueMap map<text,text>
);

I was able to insert the data in Cassandra table using the Spark Cassandra connector.

Now, I am updating the same map column (2nd column) with new key values for the same rowkey (1st column/primary key). But, every new update to this column purges the previous map.

How can I append the same map using Spark Cassandra connector?


Solution

  • I don't think that it's possible to do it directly from Dataframe API, but it's possible to do via RDD API. For example, I have following tabble with some test data:

    CREATE TABLE test.m1 (
        id int PRIMARY KEY,
        m map<int, text>
    );
    cqlsh> select * from test.m1;                                                                                                                                                                                                  id | m                                                                                                        ----+--------------------                                                                                        1 | {1: 't1', 2: 't2'}                                                                                                                                                                                                      (1 rows)
    

    and I have data in Spark:

    scala> val data = Seq((1, Map(3 -> "t3"))).toDF("id", "m")                                                     data: org.apache.spark.sql.DataFrame = [id: int, m: map<int,string>]
    

    then I can specify that I want to append data to specific column wit following code:

    data.rdd.saveToCassandra("test", "m1", SomeColumns("id", "m" append))
    

    and I can see that data is updated:

    cqlsh> select * from test.m1;
    id | m
    ----+----------------------------- 
     1 | {1: 't1', 2: 't2', 3: 't3'}
    (1 rows)
    

    Besides append, there is support for removing elements with remove option, and prepend (only for lists). Documentation contains examples on that.