Search code examples
apache-sparkapache-spark-sqlmapr

Spark dataframe Timestamp column inferred as of InvalidType from Mapr DB table


I reading a table from MapR DB with Spark. But the timestamp column is inferred as InvalidType. There is no option of setting schema as well when you read data from Mapr db.

root
 |-- Name: string (nullable = true)
 |-- dt: struct (nullable = true)
 |    |-- InvalidType: string (nullable = true)

I tried to cast the column to timestamp, but got the below exception.

 val df = spark.loadFromMapRDB("path")
df.withColumn("dt1", $"dt" ("InvalidType").cast(TimestampType))     
  .drop("dt")
df.show(5, false)

com.mapr.db.spark.exceptions.SchemaMappingException: Schema cannot be inferred for the column {dt} at com.mapr.db.spark.sql.utils.MapRSqlUtils$.convertField(MapRSqlUtils.scala:250) at com.mapr.db.spark.sql.utils.MapRSqlUtils$.convertObject(MapRSqlUtils.scala:64) at com.mapr.db.spark.sql.utils.MapRSqlUtils$.convertRootField(MapRSqlUtils.scala:48) at com.mapr.db.spark.sql.utils.MapRSqlUtils$$anonfun$documentsToRow$1.apply(MapRSqlUtils.scala:34) at com.mapr.db.spark.sql.utils.MapRSqlUtils$$anonfun$documentsToRow$1.apply(MapRSqlUtils.scala:33) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Any help will be appreciated.


Solution

  • If you know the schema of the table. You can create your own case class defining the schema of the table and then load the table using this case class.

    Go through this link Loading Data from MapR Database as an Apache Spark Dataset

    And also check the table in MapRDB if that particular column has valid schema or not