Search code examples
javaapache-sparkapache-spark-sqlparquetmapr

Spark SQL Java GenericRowWithSchema cannot be cast to java.lang.String


I have an application that's attempting to read a group of csv from a cluster dir and write them as parquet file using Spark.

SparkSession sparkSession = createSession();
    JavaRDD<Row> entityRDD = sparkSession.read()
            .csv(dataCluster + "measures/measures-*.csv")
            .javaRDD()
            .mapPartitionsWithIndex(removeHeader, false)
            .map((Function<String, Measure>) s -> {
                String[] parts = s.split(COMMA);
                Measure measure = new Measure();
                measure.setCobDate(parts[0]);
                measure.setDatabaseId(Integer.valueOf(parts[1]));
                measure.setName(parts[2]);

                return measure;
            });

    Dataset<Row> entityDataFrame = sparkSession.createDataFrame(entityRDD, Measure.class);
    entityDataFrame.printSchema();

    //Create parquet file here
    String parquetDir = dataCluster + "measures/parquet/measures";
    entityDataFrame.write().mode(SaveMode.Overwrite).parquet(parquetDir);


    sparkSession.stop();

The Measure class is a simple POJO that implements Serializable. The schema is printed so there must be a problem translating the DataFrame entries to the parquet file. Here's the error I get:

Lost task 2.0 in stage 1.0 (TID 3, redlxd00006.fakepath.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:244)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
        ... 8 more

Ultimately my intention is to use Spark SQL to filter and join the data with other csvs, containing other table data, and write the entire results to parquet. I've only found scala related questions which haven't addressed my problem. Any help is much appreciated.

csv:

cob_date, database_id, name
20181115,56459865,name1
20181115,56652865,name6
20181115,56459845,name32
20181115,15645936,name3

Solution

  • .map((Function<String, Measure>) s -> {
    

    Looks like here should be

    .map((Function<Row, Measure>) s -> {