In Spark, version 1.6.1 (code is in Scala 2.10), I am trying to write a data frame to a Parquet file:
import sc.implicits._
val triples = file.map(p => _parse(p, " ", true)).toDF()
triples.write.mode(SaveMode.Overwrite).parquet("hdfs://some.external.ip.address:9000/tmp/table.parquet")
When I do it in development mode, everything works fine. It also works fine if I setup a master and one worker in standalone mode in a docker environment (separate docker containers) on the same machine. It fails when I try to execute it on a cluster (1 master, 5 workers). If I set it up local on the master it also works.
When I try to execute it, I get following stacktrace:
{
"duration": "18.716 secs",
"classPath": "LDFSparkLoaderJobTest2",
"startTime": "2016-07-18T11:41:03.299Z",
"context": "sql-context",
"result": {
"errorClass": "org.apache.spark.SparkException",
"cause": "Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, curry-n3): java.lang.NullPointerException
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.abortTask$1(WriterContainer.scala:294)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:271)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:",
"stack":[
"org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)",
"org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)",
"org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)",
"scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)",
"scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)",
"org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)",
"org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)",
"org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)",
"scala.Option.foreach(Option.scala:236)",
"org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)",
"org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)",
"org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)",
"org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)",
"org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)",
"org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)",
"org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)",
"org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)",
"org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)",
"org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)",
"org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)",
"org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)",
"org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)",
"org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)",
"org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)",
"org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)",
"org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)",
"org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)",
"org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)",
"org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)",
"org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)",
"org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)",
"org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)",
"org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)",
"org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)",
"org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)",
"org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)",
"LDFSparkLoaderJobTest2$.readFile(SparkLoaderJob.scala:55)",
"LDFSparkLoaderJobTest2$.runJob(SparkLoaderJob.scala:48)",
"LDFSparkLoaderJobTest2$.runJob(SparkLoaderJob.scala:18)",
"spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:268)",
"scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)",
"scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)",
"java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)",
"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)",
"java.lang.Thread.run(Thread.java:745)"
],
"causingClass": "org.apache.spark.SparkException",
"message": "Job aborted."
},
"status": "ERROR",
"jobId": "54ad3056-3aaa-415f-8352-ca8c57e02fe9"
}
Notes:
Question:
In your stand alone setup only one worker is working with ParquetRecordWriter
. so it worked fine.
In case of real test i.e. cluster (1 master, 5 workers). with ParquetRecordWriter
it will fail since you are concurrently writing with multiple workers...
pls try below.
import sc.implicits._
val triples = file.map(p => _parse(p, " ", true)).toDF()
triples.write.mode(SaveMode.Append).parquet("hdfs://some.external.ip.address:9000/tmp/table.parquet")