Search code examples
scalaapache-sparkcassandradatastax

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map of type scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)


I am having a Spark DataFrame of following schema.

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

I am writing the same to Cassandra table.

Cassandra table schema as below:

create table provision_bmss.bmss_cust (
  partition_key text,
  row_key text,
  group int,
  attributes map<text,text>,
  data_as_of_date text,
  PRIMARY KEY (partition_key, row_key, group)
  )
  WITH cdc = 'FALSE'
  AND default_time_to_live = '34560000';

I am using Spark Datastax Connector to write to the table after below logic:

val maxItem = 65000
  
  dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
  .withColumn("group", $"pos".divide(maxItem).cast("int"))
  .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
  .agg(collect_list(map($"key", $"value")).as("attributes"))
  .select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode("append")
  .options(Map( "keyspace" -> keySpace, "table" -> tableName ))
  .save()

I am getting the below error:

com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)

I think this is related to the line .agg(collect_list(map($"key", $"value")).as("attributes")) in the code.

Here, everything in the Map is of Type <String, String>

I am not able to solve the same. Can someone please assist.

The output DataFrame schema is as below (which is not as expected):

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- group: int (nullable = true)
 |-- attributes: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

The expected output DataFrame schema is as below:

root
     |-- partition_key: string (nullable = true)
     |-- row_key: string (nullable = true)
     |-- group: int (nullable = true)
     |-- attributes: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)
     |-- data_as_of_date: string (nullable = true)

The pre-question to this question is Divide Spark DataFrame rows into multiple rows depending on the size of Map type column


Solution

  • I referred this post - How to use groupBy to collect rows into a map?

    I was able to do the same using flatten and toMap functions.

    Updated working code as below:

    val joinMap = udf {
      values: Seq[Map[String,String]] => values.flatten.toMap
    }
    
    def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
       
      val maxItem = 65000
      
      val dfPreFinal = 
      dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
      .withColumn("group", $"pos".divide(maxItem).cast("int"))
      .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
      .agg(collect_list(map($"key", $"value")).as("attributes"))
      .select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
      
      val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
      
      .write
      .format("org.apache.spark.sql.cassandra")
      .mode("append")
      .options(Map( "keyspace" -> keySpace, "table" -> tableName ))
      .save() 
    }
    

    Is there a better way (may be without UDF)?