Search code examples
scalaapache-sparkimpalaapache-kudu

Spark dataframe cast column for Kudu compatibility


(I am new to Spark, Impala and Kudu.) I am trying to copy a table from an Oracle DB to an Impala table having the same structure, in Spark, through Kudu. I am getting an error when the code tries to map an Oracle NUMBER to a Kudu data type. How can I change the data type of a Spark DataFrame to make it compatible with Kudu?

This is intended to be a 1-to-1 copy of data from Oracle to Impala. I have extracted the Oracle schema of the source table and created a target Impala table with the same structure (same column names and a reasonable mapping of data types). I was hoping that Spark+Kudu would map everything automatically and just copy the data. Instead, Kudu complains that it cannot map DecimalType(38,0).

I would like to specify that "column #1, with name SOME_COL, which is a NUMBER in Oracle, should be mapped to a LongType, which is supported in Kudu".

How can I do that?

// This works
val df: DataFrame = spark.read
  .option("fetchsize", 10000)
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)

// This does not work  
kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")
// Error: No support for Spark SQL type DecimalType(38,0)
// See https://github.com/cloudera/kudu/blob/master/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala

// So let's see the Spark data types
df.dtypes.foreach{case (colName, colType) => println(s"$colName: $colType")}
// Spark  data type: SOME_COL DecimalType(38,0)
// Oracle data type: SOME_COL NUMBER -- no precision specifier; values are int/long
// Kudu   data type: SOME_COL BIGINT

Solution

  • Apparently, we can specify a custom schema when reading from a JDBC data source.

    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    

    That worked. I was able to specify a customSchema like so:

    col1 Long, col2 Timestamp, col3 Double, col4 String
    

    and with that, the code works:

    import spark.implicits._
    val df: Dataset[case_class_for_table] = spark.read
      .option("fetchsize", 10000)
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)
      .as[case_class_for_table]
    kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")