Search code examples
dataframeapache-sparkcassandrarddspark-cassandra-connector

Spark Scala Cassandra connector delete all all rows is failing with IllegalArgumentException requirement failed Exception


Create table -

CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word));

Insert Data -

INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'b-group', 'jaj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raff', 3) ;

 SELECT * FROM word_groups ;

 group   | word | count
---------+------+-------
 b-group |  jaj |     0
 A-group | raff |     3
 A-group |  raj |     0

Script -

val cassandraUrl = "org.apache.spark.sql.cassandra"
val wordGroup: Map[String, String] = Map("table" ->"word_groups", 
  "keyspace" -> "test", "cluster" -> "test-cluster")
val groupData = {spark.read.format(cassandraUrl).options(wordGroup).load()
  .where(col("group") === "b-group")}
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups")

Exception -

java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
    at scala.Predef$.require(Predef.scala:224)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:229)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter.delete(TableWriter.scala:194)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
21/08/11 09:01:24 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 2953, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.

Spark Version - 2.4.4 and Spark Cassandra Connector version - 2.5.0

Spark Cassandra connector doc link - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#deleting-rows-and-columns

I'm trying to delete all records of those columns including primary keys.

Is there is any workaround for it ?

FYI - I need to delete all the records of group "A-group" from the word_groups table including primary keys / partition keys


Solution

  • it's interesting change in 2.5.x, that I wasn't aware of - you now need to have a correct row size even if keyColumns is specified, it worked without it before - looks like a bug for me.

    You need to leave only the primary key when deleting the whole row - change delete to the:

    groupData.select("group", "word").rdd.deleteFromCassandra("test", "word_groups")
    

    but in your case, it's even better to delete based on the partition key column - in this case you'll have only one tombstone (you still need to select only necessary columns):

    import com.datastax.spark.connector._
    {groupData.select("group").rdd
      .deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
    

    And you don't even need to read the input data from Cassandra - if you know the values of partition key, then you can just create the RDD & delete data (similar to shown in the doc):

    case class Key (group:String)
    { sc.parallelize(Seq(Key("b-group")))
       .deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}