Search code examples
scalaapache-sparkruntime-error

java.lang.String is not a valid external type for schema of int error in creating spark dataframe


I just tried to make dataframe with spark. I just tried to made codes as like below.

First, I imported as like below

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types.IntegerType`

and then, I tried to make Row and Schema for dataframe like below.

val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))

Finally, I tried to see is the dataframe is alright with using

EmpDF.show

and I got the errors like below

    ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
    java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
    java.lang.String is not a valid external type for schema of int
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
        ... 25 more
20/07/12 16:32:51 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)  

How can I fix this problem?

I have already tried to import as like below

    import org.apache.spark.serializer.KryoSerializer
    import org.apache.spark.serializer.Serializer

But now it shows the error with

ERROR Executor: Exception in task 2.0 in stage 5.0 (TID 13)

Solution

  • Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int is due to the Type mismatch between defined schema and the actual data "Jeong","Daejon","3400000" -> (string,string,string). but you specified in the schema as (String,String,Integer).

    updated code 1 in specific to integer type:

    import org.apache.spark.sql.types._
    import org.apache.spark.storage.StorageLevel
    import scala.io.Source
    import scala.collection.mutable.HashMap
    import java.io.File
    import org.apache.spark.sql.Row
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.util._
    import org.apache.spark.sql.types._
    
    val Employee = Seq(Row("Kim","Seoul",1000000),Row("Lee","Busan",2000000),Row("Park","Jeju",3000000),Row("Jeong","Daejon",3400000))
    
    val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))
    
    val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
    EmpDF.show()
    /*+-----+------+-------+
    | Name|  City| Salary|
    +-----+------+-------+
    |  Kim| Seoul|1000000|
    |  Lee| Busan|2000000|
    | Park|  Jeju|3000000|
    |Jeong|Daejon|3400000|
    +-----+------+-------+*/
    

    updated code in specific to string type:

    import org.apache.spark.sql.types._
    import org.apache.spark.storage.StorageLevel
    import scala.io.Source
    import scala.collection.mutable.HashMap
    import java.io.File
    import org.apache.spark.sql.Row
    import scala.collection.mutable.ListBuffer
    import org.apache.spark.util._
    import org.apache.spark.sql.types._
    
    val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))
    
    val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", StringType, true))
    
    val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
    EmpDF.show()
    /*+-----+------+-------+
    | Name|  City| Salary|
    +-----+------+-------+
    |  Kim| Seoul|1000000|
    |  Lee| Busan|2000000|
    | Park|  Jeju|3000000|
    |Jeong|Daejon|3400000|
    +-----+------+-------+*/