Search code examples
apache-sparkcassandradatastaxspark-cassandra-connector

Spark Cassandra Write UDT With Case-Sensitive Names Fails


  • Spark connector Write fails with a java.lang.IllegalArgumentException: udtId is not a field defined in this definition error when using case-sensitive field names
  • I need the fields in the Cassandra table to maintain case. So i have used quotes to create them.

My Cassandra schema

CREATE TYPE my_keyspace.my_udt (
  "udtId" text,
  "udtValue" text
);

CREATE TABLE my_keyspace.my_table (
  "id" text PRIMARY KEY,
  "someCol" text,
  "udtCol" list<frozen<my_udt>>
);

My Spark DataFrame schema is

root
 |-- id: string (nullable = true)
 |-- someCol: string (nullable = true)
 |-- udtCol: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- udtId: string (nullable = true)
           |-- udtValue: string (nullable = true)
  • Are there any other options to get this write to work other than defining my udt with lowercase names? Making them lower case would make me invoke case management code everywhere this is used and i'd like to avoid that ?
  • Because i couldn't write successfully, i did try read yet? Is this an issue with reads as well ?

Solution

  • You need to upgrade to Spark Cassandra Connector 2.5.0 - I can't find specific commit that fixes it, or specific Jira that mentions that - I suspect that it was fixed in the DataStax version first, and then released as part of merge announced here.

    Here is how it works in SCC 2.5.0 + Spark 2.4.6, while it fails with SCC 2.4.2 + Spark 2.4.6:

    scala> import org.apache.spark.sql.cassandra._
    import org.apache.spark.sql.cassandra._
    
    scala> val data = spark.read.cassandraFormat("my_table", "test").load()
    data: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]
    
    scala> val data2 = data.withColumn("id", concat(col("id"), lit("222")))
    data2: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]
    
    scala> data2.write.cassandraFormat("my_table", "test").mode("append").save()