I'm trying to save dataframe with MapType column to Clickhouse (with map type column in schema too), using clickhouse-native-jdbc driver, and faced with this error:
Caused by: java.lang.IllegalArgumentException: Can't translate non-null value for field 74
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16(JdbcUtils.scala:593)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16$adapted(JdbcUtils.scala:591)
I found this place in spark source code and it contains this:
private def makeSetter(
conn: Connection,
dialect: JdbcDialect,
dataType: DataType): JDBCValueSetter = dataType match {
case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getInt(pos))
case LongType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setLong(pos + 1, row.getLong(pos))
...
case _ =>
(_: PreparedStatement, _: Row, pos: Int) =>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
This function matching column type, and if there's no suitable type throws this error. So as I can see, spark can't deal with MapType columns at all.
I'm tried to copy and modify org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
file to make it able to work with MapType column like this:
case MapType(_, _, _) =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
val map = row.getMap[AnyRef, AnyRef](pos)
stmt.setObject(pos + 1, mapAsJavaMap(map))
In local machine it works as expected, but in cluster mode executors using the stock version, instead of my own.
Does anybody have any idea, how to make spark working with MapType columns another way, or make executors using modified source code?
Thanks Danilo Rodrigues for inspiration, finally I solved my issue this way: Instead of writing Map value as is, I actually transform it to json string, and my table schema in Clickhouse now looks like this:
CREATE TABLE t1 (
param_str String,
param MATERIALIZED cast((arrayMap(x->x.1, JSONExtractKeysAndValues(param_str, 'String')), arrayMap(x->x.2, JSONExtractKeysAndValues(param_str, 'String'))), 'Map(String, String)')
) Engine ...
Yes, it looks a little bit ugly, and I'd prefer to choose approach with changing spark source code, but current approach works well