Search code examples
apache-sparkgoogle-cloud-platformbigdatagoogle-cloud-dataproc

PySpark Job fails with workflow template


To follow with this question I decided to try the workflow template API.

Here's what it looks like :

gcloud beta dataproc workflow-templates create lifestage-workflow --region europe-west2
gcloud beta dataproc workflow-templates set-managed-cluster lifestage-workflow \
  --master-machine-type n1-standard-8 \
  --worker-machine-type n1-standard-16 \
  --num-workers 6 \
  --cluster-name lifestage-workflow-cluster \
  --initialization-actions gs://..../init.sh \
  --zone europe-west2-b \
  --region europe-west2 \

gcloud beta dataproc workflow-templates add-job pyspark gs://.../main.py \
--step-id prediction \
--region europe-west2 \
--workflow-template lifestage-workflow \
--jars gs://.../custom.jar \
--py-files gs://.../jobs.zip,gs://.../config.ini \
-- --job predict --conf config.ini

The template is correctly created. The job works when I run it manually from one of my already existing clusters. It also runs when I use an existing cluster instead of asking the workflow to create one.

The thing is I want the cluster to be created before running the job and deleted just after, that's why I'm using a managed cluster.

But with the managed cluster I just can't make it run. I tried to use the same configuration as my existing clusters but it doesn't change anything. I always get the same error.

Any idea why my job runs perfectly except for when it is run from a generated cluster ?


Solution

  • The problem came from the version of the managed cluster.
    By default the image version was 1.2.31 and my existing cluster was using the image 1.2.28.
    When I changed the config to add --image-version=1.2.28 it worked.

    Dataproc image 1.2.31 Upgraded Spark to 2.2.1 which introduced [SPARK-22472]:

    SPARK-22472: added null check for top-level primitive types. Before this release, for datasets having top-level primitive types, and it has null values, it might return some unexpected results. For example, let’s say we have a parquet file with schema , and we read it into Scala Int. If column a has null values, when transformation is applied some unexpected value can be returned.

    This likely added just enough generated code to take classes of 64k limit.