We are running a few dataproc jobs with dataproc image 1.0 and spark-redshift.
We have two clusters, here are some details:
2016. Jul 15. 11:27:12 AEST
Since sometime last Friday (2016-08-05 AEST
), our code stopped working on cluster B with the following error, while cluster A is running without issues.
The following code can reproduce the issue on Cluster B (or any new cluster with image v1.0.0) while it runs fine on cluster A.
Sample PySpark Code:
from pyspark import SparkContext, SQLContext
sc = SparkContext()
sql_context = SQLContext(sc)
rdd = sc.parallelize([{'user_id': 'test'}])
df = rdd.toDF()
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "FOO")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "BAR")
df\
.write\
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://foo.ap-southeast-2.redshift.amazonaws.com/bar") \
.option("dbtable", 'foo') \
.option("tempdir", "s3n://bar") \
.option("extracopyoptions", "TRUNCATECOLUMNS") \
.mode("append") \
.save()
The above code fails in both of the following situations on Cluster B, while running fine on A. note that the RedshiftJDBC41-1.1.10.1010.jar is created via cluster init script.
Running in interactive mode on master node:
PYSPARK_DRIVER_PYTHON=ipython pyspark \
--verbose \
--master "local[*]"\
--jars /usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \
--packages com.databricks:spark-redshift_2.10:1.0.0
Submit the job via gcloud dataproc
gcloud --project foo \
dataproc jobs submit pyspark \
--cluster bar \
--properties ^#^spark.jars.packages=com.databricks:spark-redshift_2.10:1.0.0#spark.jars=/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \
foo.bar.py
The error it produces (Trace):
2016-08-08 06:12:23 WARN TaskSetManager:70 - Lost task 6.0 in stage 45.0 (TID 121275, foo.bar.internal):
java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55)
at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82)
at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-08-08 06:12:24 ERROR YarnScheduler:74 - Lost executor 63 on kinesis-ma-sw-o7he.c.bupa-ma.internal: Container marked as failed: container_1470632577663_0003_01_000065 on host: kinesis-ma-sw-o7he.c.bupa-ma.internal. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1470632577663_0003_01_000065
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
SparkRedshift:1.0.0 requires com.databricks.spark-avro:2.0.1, which requires org.apache.avro:1.7.6.
Upon checking the version of org.apache.avro.generic.GenericData
on Cluster A:
root@foo-bar-m:/home/foo# spark-shell \
> --verbose \
> --master "local[*]" \
> --deploy-mode client \
> --packages com.databricks:spark-redshift_2.10:1.0.0 \
> --jars "/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar"
It produces (Trace):
scala> import org.apache.avro.generic._
import org.apache.avro.generic._
scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@496a514f
scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.5-hadoop2.jar <no signer certificates>)
While running the same command on Cluster B:
scala> import org.apache.avro.generic._
import org.apache.avro.generic._
scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@72bec302
scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.7-hadoop2.jar <no signer certificates>)
Screenshot of Env on Cluster B. (Apologies for all the redactions). We've tried method described on here and here without any success.
This is really frustrating as the DataProc updates the image content without bumping the release version as the complete opposite of immutable releases. Now our code is broke and there is no way we could roll back to the previous version.
Sorry for the trouble! It's certainly not intended for breaking changes to occur within an image version. Note that subminor versions are rolled out "under the hood" for non-breaking bug fixes and Dataproc-specific patches.
You can revert to using the 1.0.* version from before last week by simply specifying --image-version 1.0.8
when deploying clusters from the command-line:
gcloud dataproc clusters create --image-version 1.0.8
Edit: For additional clarification, we've investigated the Avro versions in question and verified that Avro version numbers actually did not change in any recent subminor Dataproc release. The core issue is that Hadoop itself has had a latent bug where Hadoop itself brings avro-1.7.4
under /usr/lib/hadoop/lib/
and Spark uses avro-1.7.7
. Coincidentally Google's bigquery connectory also uses avro-1.7.7
but this turns out to be orthogonal to the known Spark/Hadoop problem with 1.7.4 vs 1.7.7. The recent image update was deemed nonbreaking because versions in fact did not change, but classloading ordering changed in a nondeterministic way where Hadoop's bad avro version used to be hidden from the Spark job by pure luck, and is no longer accidentally hidden in the latest image.
Dataproc's preview
image currently includes a fix to the avro version in the Hadoop layer which should make it into any future Dataproc 1.1 version when it comes out; you might want to consider trying the preview
version to see if Spark 2.0 is a seamless transition.