Search code examples
apache-sparkgoogle-cloud-platformgoogle-cloud-dataproc

DataProc Avro Version Causing Error on Image v1.0.0


We are running a few dataproc jobs with dataproc image 1.0 and spark-redshift.

We have two clusters, here are some details:

  • Cluster A -> Runs PySpark Streaming job, last created 2016. Jul 15. 11:27:12 AEST
  • Cluster B -> Runs PySpark Batch jobs, the cluster is created everytime the job is run and teardown afterwards.
  • A & B runs the same code base, use the same init script, same node types etc.

Since sometime last Friday (2016-08-05 AEST), our code stopped working on cluster B with the following error, while cluster A is running without issues.

The following code can reproduce the issue on Cluster B (or any new cluster with image v1.0.0) while it runs fine on cluster A.

Sample PySpark Code:

from pyspark import SparkContext, SQLContext
sc = SparkContext()
sql_context = SQLContext(sc)

rdd = sc.parallelize([{'user_id': 'test'}])
df = rdd.toDF()

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "FOO")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "BAR")

df\
    .write\
    .format("com.databricks.spark.redshift") \
    .option("url", "jdbc:redshift://foo.ap-southeast-2.redshift.amazonaws.com/bar") \
    .option("dbtable", 'foo') \
    .option("tempdir", "s3n://bar") \
    .option("extracopyoptions", "TRUNCATECOLUMNS") \
    .mode("append") \
    .save()

The above code fails in both of the following situations on Cluster B, while running fine on A. note that the RedshiftJDBC41-1.1.10.1010.jar is created via cluster init script.

  • Running in interactive mode on master node:

    PYSPARK_DRIVER_PYTHON=ipython pyspark \
        --verbose \
        --master "local[*]"\
        --jars /usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \
        --packages com.databricks:spark-redshift_2.10:1.0.0 
    
  • Submit the job via gcloud dataproc

    gcloud --project foo \
       dataproc jobs submit pyspark \
       --cluster bar \
       --properties ^#^spark.jars.packages=com.databricks:spark-redshift_2.10:1.0.0#spark.jars=/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar \
       foo.bar.py
    

The error it produces (Trace):

2016-08-08 06:12:23 WARN  TaskSetManager:70 - Lost task 6.0 in stage 45.0 (TID 121275, foo.bar.internal): 
    java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
    at org.apache.avro.mapreduce.AvroKeyRecordWriter.<init>(AvroKeyRecordWriter.java:55)
    at org.apache.avro.mapreduce.AvroKeyOutputFormat$RecordWriterFactory.create(AvroKeyOutputFormat.java:79)
    at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:105)
    at com.databricks.spark.avro.AvroOutputWriter.<init>(AvroOutputWriter.scala:82)
    at com.databricks.spark.avro.AvroOutputWriterFactory.newInstance(AvroOutputWriterFactory.scala:31)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2016-08-08 06:12:24 ERROR YarnScheduler:74 - Lost executor 63 on kinesis-ma-sw-o7he.c.bupa-ma.internal: Container marked as failed: container_1470632577663_0003_01_000065 on host: kinesis-ma-sw-o7he.c.bupa-ma.internal. Exit status: 50. Diagnostics: Exception from container-launch.
Container id: container_1470632577663_0003_01_000065
Exit code: 50
Stack trace: ExitCodeException exitCode=50:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
    at org.apache.hadoop.util.Shell.run(Shell.java:456)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

SparkRedshift:1.0.0 requires com.databricks.spark-avro:2.0.1, which requires org.apache.avro:1.7.6.

Upon checking the version of org.apache.avro.generic.GenericData on Cluster A:

root@foo-bar-m:/home/foo# spark-shell \
>     --verbose \
>     --master "local[*]" \
>     --deploy-mode client \
>     --packages com.databricks:spark-redshift_2.10:1.0.0 \
>     --jars "/usr/lib/hadoop/lib/RedshiftJDBC41-1.1.10.1010.jar"

It produces (Trace):

scala> import org.apache.avro.generic._
import org.apache.avro.generic._

scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@496a514f

scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.5-hadoop2.jar <no signer certificates>)

While running the same command on Cluster B:

scala> import org.apache.avro.generic._
import org.apache.avro.generic._

scala> val c = GenericData.get()
c: org.apache.avro.generic.GenericData = org.apache.avro.generic.GenericData@72bec302

scala> c.getClass.getProtectionDomain().getCodeSource()
res0: java.security.CodeSource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.7-hadoop2.jar <no signer certificates>)

Screenshot of Env on Cluster B. (Apologies for all the redactions). We've tried method described on here and here without any success.

This is really frustrating as the DataProc updates the image content without bumping the release version as the complete opposite of immutable releases. Now our code is broke and there is no way we could roll back to the previous version.


Solution

  • Sorry for the trouble! It's certainly not intended for breaking changes to occur within an image version. Note that subminor versions are rolled out "under the hood" for non-breaking bug fixes and Dataproc-specific patches.

    You can revert to using the 1.0.* version from before last week by simply specifying --image-version 1.0.8 when deploying clusters from the command-line:

    gcloud dataproc clusters create --image-version 1.0.8
    

    Edit: For additional clarification, we've investigated the Avro versions in question and verified that Avro version numbers actually did not change in any recent subminor Dataproc release. The core issue is that Hadoop itself has had a latent bug where Hadoop itself brings avro-1.7.4 under /usr/lib/hadoop/lib/ and Spark uses avro-1.7.7. Coincidentally Google's bigquery connectory also uses avro-1.7.7 but this turns out to be orthogonal to the known Spark/Hadoop problem with 1.7.4 vs 1.7.7. The recent image update was deemed nonbreaking because versions in fact did not change, but classloading ordering changed in a nondeterministic way where Hadoop's bad avro version used to be hidden from the Spark job by pure luck, and is no longer accidentally hidden in the latest image.

    Dataproc's preview image currently includes a fix to the avro version in the Hadoop layer which should make it into any future Dataproc 1.1 version when it comes out; you might want to consider trying the preview version to see if Spark 2.0 is a seamless transition.