Search code examples
shellapache-sparkgoogle-cloud-platformgoogle-cloud-dataproc

GCP Dataproc - cluster creation failing when using connectors.sh in initialization-actions


I'm creating a Dataproc cluster, and it is timing out when i'm adding the connectors.sh in the initialization actions.

here is the command & error

NUM_WORKER=2
TYPE=n1-highmem-8
CNAME=dataproc-poc
BUCKET=dataproc-spark-karan
REGION=us-central1
ZONE=us-central1-c
IMG_VERSION=2.0.29-debian10
PROJECT=versa-kafka-poc
NUM_WORKER=2

Karans-MacBook-Pro:dataproc-versa-sase karanalang$ gcloud beta dataproc clusters create $CNAME \
>            --enable-component-gateway \
>            --bucket $BUCKET \
>            --region $REGION \
>            --zone $ZONE \
>            --no-address --master-machine-type $TYPE \
>            --master-boot-disk-size 100 \
>            --master-boot-disk-type pd-ssd \
>            --num-workers $NUM_WORKER \
>            --worker-machine-type $TYPE \
>            --worker-boot-disk-type pd-ssd \
>            --worker-boot-disk-size 100 \
>            --image-version $IMG_VERSION \
>            --scopes 'https://www.googleapis.com/auth/cloud-platform' \
>            --project $PROJECT \
>            --initialization-actions 'gs://dataproc-kafka/config/pip_install.sh','gs://dataproc-kafka/config/connectors.sh' \
>            --metadata 'gcs-connector-version=2.0.0' \
>            --metadata 'bigquery-connector-version=1.2.0' \
>            --properties 'dataproc:dataproc.logging.stackdriver.job.driver.enable=true,dataproc:job.history.to-gcs.enabled=true,spark:spark.dynamicAllocation.enabled=false,spark:spark.executor.instances=6,spark:spark.executor.cores=2,spark:spark.eventLog.dir=gs://dataproc-spark-karan/joblogs,spark:spark.history.fs.logDirectory=gs://dataproc-spark-karan/joblogs'
Waiting on operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45].
Waiting for cluster creation operation...                                                                                                                                                                  
WARNING: This cluster is configured to use network 'https://www.googleapis.com/compute/v1/projects/versa-kafka-poc/global/networks/default' and its associated firewall rules '[prometheus-nodeport]' which contains the following potential security vulnerability: 'port 8088 is open to the internet, this may allow arbitrary code execution via the YARN REST API. Use Component Gateway for secure remote access to the YARN UI and other cluster UIs instead: https://cloud.google.com/dataproc/docs/concepts/accessing/dataproc-gateways.'
Waiting for cluster creation operation...done.                                                                                                                                                             
ERROR: (gcloud.beta.dataproc.clusters.create) Operation [projects/versa-kafka-poc/regions/us-central1/operations/8aa13a77-30a8-3a84-a949-16b4d8907c45] timed out.

connectors.sh

#!/bin/bash

set -euxo pipefail

VM_CONNECTORS_HADOOP_DIR=/usr/lib/hadoop/lib
VM_CONNECTORS_DATAPROC_DIR=/usr/local/share/google/dataproc/lib

declare -A MIN_CONNECTOR_VERSIONS
MIN_CONNECTOR_VERSIONS=(
  ["bigquery"]="0.11.0"
  ["gcs"]="1.7.0")

# Starting from these versions connectors name changed:
# "...-<version>-hadoop2.jar" -> "...-hadoop2-<version>.jar"
declare -A NEW_NAME_MIN_CONNECTOR_VERSIONS
NEW_NAME_MIN_CONNECTOR_VERSIONS=(
  ["bigquery"]="0.13.5"
  ["gcs"]="1.9.5")

BIGQUERY_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/bigquery-connector-version || true)
GCS_CONNECTOR_VERSION=$(/usr/share/google/get_metadata_value attributes/gcs-connector-version || true)

UPDATED_GCS_CONNECTOR=false

is_worker() {
  local role
  role="$(/usr/share/google/get_metadata_value attributes/dataproc-role || true)"
  if [[ $role != Master ]]; then
    return 0
  fi
  return 1
}

min_version() {
  echo -e "$1\n$2" | sort -r -t'.' -n -k1,1 -k2,2 -k3,3 | tail -n1
}

validate_version() {
  local name=$1    # connector name: "bigquery" or "gcs"
  local version=$2 # connector version
  local min_valid_version=${MIN_CONNECTOR_VERSIONS[$name]}
  if [[ "$(min_version "$min_valid_version" "$version")" != "$min_valid_version" ]]; then
    echo "ERROR: $name-connector version should be greater than or equal to $min_valid_version, but was $version"
    return 1
  fi
}

update_connector() {
  local name=$1    # connector name: "bigquery" or "gcs"
  local version=$2 # connector version
  if [[ $version ]]; then
    if [[ $name == gcs ]]; then
      UPDATED_GCS_CONNECTOR=true
    fi

    # validate new connector version
    validate_version "$name" "$version"

    if [[ -d ${VM_CONNECTORS_DATAPROC_DIR} ]]; then
      local vm_connectors_dir=${VM_CONNECTORS_DATAPROC_DIR}
    else
      local vm_connectors_dir=${VM_CONNECTORS_HADOOP_DIR}
    fi

    # remove old connector
    rm -f "${vm_connectors_dir}/${name}-connector-"*

    # download new connector
    # connector name could be in one of 2 formats:
    # 1) gs://hadoop-lib/${name}/${name}-connector-hadoop2-${version}.jar
    # 2) gs://hadoop-lib/${name}/${name}-connector-${version}-hadoop2.jar
    local new_name_min_version=${NEW_NAME_MIN_CONNECTOR_VERSIONS[$name]}
    if [[ "$(min_version "$new_name_min_version" "$version")" == "$new_name_min_version" ]]; then
      local jar_name="${name}-connector-hadoop2-${version}.jar"
    else
      local jar_name="${name}-connector-${version}-hadoop2.jar"
    fi
    gsutil cp "gs://hadoop-lib/${name}/${jar_name}" "${vm_connectors_dir}/"

    # Update or create version-less connector link
    ln -s -f "${vm_connectors_dir}/${jar_name}" "${vm_connectors_dir}/${name}-connector.jar"
  fi
}

if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
  echo "ERROR: None of connector versions are specified"
  exit 1
fi

# because connectors from 1.7 branch are not compatible with previous connectors
# versions (they have the same class relocation paths) we need to update both
# of them, even if only one connector version is set
if [[ -z $BIGQUERY_CONNECTOR_VERSION ]] && [[ $GCS_CONNECTOR_VERSION == "1.7.0" ]]; then
  BIGQUERY_CONNECTOR_VERSION="0.11.0"
fi
if [[ $BIGQUERY_CONNECTOR_VERSION == "0.11.0" ]] && [[ -z $GCS_CONNECTOR_VERSION ]]; then
  GCS_CONNECTOR_VERSION="1.7.0"
fi

update_connector "bigquery" "$BIGQUERY_CONNECTOR_VERSION"
update_connector "gcs" "$GCS_CONNECTOR_VERSION"

if [[ $UPDATED_GCS_CONNECTOR != true ]]; then
  echo "GCS connector wasn't updated - no need to restart any services"
  exit 0
fi

# Restart YARN NodeManager service on worker nodes so they can pick up updated GCS connector
if is_worker; then
  systemctl kill -s KILL hadoop-yarn-nodemanager
fi

# Restarts Dataproc Agent after successful initialization
# WARNING: this function relies on undocumented and not officially supported Dataproc Agent
# "sentinel" files to determine successful Agent initialization and not guaranteed
# to work in the future. Use at your own risk!
restart_dataproc_agent() {
  # Because Dataproc Agent should be restarted after initialization, we need to wait until
  # it will create a sentinel file that signals initialization competition (success or failure)
  while [[ ! -f /var/lib/google/dataproc/has_run_before ]]; do
    sleep 1
  done
  # If Dataproc Agent didn't create a sentinel file that signals initialization
  # failure then it means that initialization succeded and it should be restarted
  if [[ ! -f /var/lib/google/dataproc/has_failed_before ]]; then
    systemctl kill -s KILL google-dataproc-agent
  fi
}
export -f restart_dataproc_agent

# Schedule asynchronous Dataproc Agent restart so it will use updated connectors.
# It could not be restarted sycnhronously because Dataproc Agent should be restarted
# after its initialization, including init actions execution, has been completed.
bash -c restart_dataproc_agent &
disown

From what i understand, the connectors.sh just ensures that correct version of the connectors is included in the cluster. Also - without the connectors.sh, the installation is going through fine.

How do i debug/fix this ? tia!


Solution

  • It seems you are using an old version of the init action script. Based on the documentation from the Dataproc GitHub repo, you can set the version of the Hadoop GCS connector without the script in the following manner:

    gcloud dataproc clusters create ${CLUSTER_NAME} \
        --region ${REGION} \
        --metadata GCS_CONNECTOR_VERSION=2.2.2
    

    For the BigQuery connectors (Spark or Hadoop MR), please use the up-to-date init action in the following manner:

        --initialization-actions gs://${BUCKET}/connectors.sh \
        --metadata bigquery-connector-version=1.2.0 \
        --metadata spark-bigquery-connector-version=0.23.2
    

    Notice that the same repo contains the updated pip-install init action as well.