I am having a Spark DataFrame of following schema.
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
I am writing the same to Cassandra table.
Cassandra table schema as below:
create table provision_bmss.bmss_cust (
partition_key text,
row_key text,
group int,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key, group)
)
WITH cdc = 'FALSE'
AND default_time_to_live = '34560000';
I am using Spark Datastax Connector to write to the table after below logic:
val maxItem = 65000
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
I am getting the below error:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object Map(cli_rel_typ_c_00001 -> 01) of type class scala.collection.immutable.Map$Map1 to (AnyRef, AnyRef)
I think this is related to the line .agg(collect_list(map($"key", $"value")).as("attributes"))
in the code.
Here, everything in the Map
is of Type <String, String>
I am not able to solve the same. Can someone please assist.
The output DataFrame schema is as below (which is not as expected):
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
The expected output DataFrame schema is as below:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- group: int (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
The pre-question to this question is Divide Spark DataFrame rows into multiple rows depending on the size of Map type column
I referred this post - How to use groupBy to collect rows into a map?
I was able to do the same using flatten and toMap functions.
Updated working code as below:
val joinMap = udf {
values: Seq[Map[String,String]] => values.flatten.toMap
}
def writeToCassandra(dataFrame: DataFrame, keySpace: String, tableName: String) = {
val maxItem = 65000
val dfPreFinal =
dataFrame.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.select($"partition_key", $"row_key", $"group", $"attributes", $"data_as_of_date")
val dfFinal = dfPreFinal.withColumn("attributes", joinMap(col("attributes")))
.write
.format("org.apache.spark.sql.cassandra")
.mode("append")
.options(Map( "keyspace" -> keySpace, "table" -> tableName ))
.save()
}
Is there a better way (may be without UDF)?