Search code examples
dockergoogle-cloud-dataflowgcloud

Dataflow Flex Template launches 2 Dataflow Jobs


I've built an ETL Python job into a Dataflow Flex template. This means I use a command in the form:

gcloud dataflow flex-template run $DF_JOB_NAME --template-file-gcs-location $FLEX_SPEC_PATH --region $REGION --service-account-email $SERVICE_ACCOUNT_EMAIL

I've successfully built and uploaded the Flex Template Image and Spec, and am able to run the job using Dataflow.

However, whenever I kick off a Dataflow job, 2 separate Dataflow jobs are started: the original Dataflow job, which seems to build the Docker Container on which the job will run. This job then kicks off the Dataflow job itself.

The first job always fails with a polling timeout, while the second job succeeds.

The problem is very similar to situation #3 in this post: Dataflow flex template job attempts to launch second job (for pipeline) with same job_name

Here is my Dockerfile:

FROM gcr.io/dataflow-templates-base/python39-template-launcher-base 


RUN apt-get update
# Upgrade pip and install the requirements.
RUN pip install --no-cache-dir --upgrade pip
RUN pip install apache-beam[gcp]
RUN pip install google-cloud-secret-manager==2.16.0

RUN mkdir -p /dataflow/template
RUN mkdir -p /dataflow/src
WORKDIR /dataflow/template

COPY src/test_job_name.py /dataflow/template/test_job_name.py
COPY src/test_job_name.json /dataflow/template/test_job_name.json
    COPY __init__.py /dataflow/template/__init__.py
COPY setup.py /dataflow/template/setup.py

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=""
ENV FLEX_TEMPLATE_PYTHON_PY_OPTIONS=""
ENV FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES=""
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/dataflow/template/run_test_job_name.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="/dataflow/template/setup.py"

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

Here is the parsing commands I use in my run() command in my main Python file:

parser = argparse.ArgumentParser()
parser.add_argument("-ds", "--date")
parser.add_argument("-p", "--project")
parser.add_argument("-t", "--temp_location")
parser.add_argument("-s", "--staging_location")
parser.add_argument("-r", "--runner")
parser.add_argument("-se", "--setup")
parser.add_argument("-sa", "--service_account_email")
parser.add_argument("-re", "--region")
args, additional_dataflow_args = parser.parse_known_args()

Here are the PipelineOptions I'm passing in my Dataflow pipeline (in the pipeline's Python file):

p_options = {
    "project": args.project,
    "staging_location": args.staging_location,
    "temp_location": args.temp_location,
    "runner": args.runner,
    "setup_file": args.setup,
    "region": args.region,
    "save_main_session": True,
    "service_account_email": args.service_account_email,
}

I've tried the solution suggested in that post (ensuring that I'm only passing the image to the job once), but find that I am only passing the dataflow template image in to one of the jobs (via the command line command written above).

I've also tried removing the "job_name" pipeline option from the Python file. In that case, Dataflow still kicks off two jobs, one of which gets a random name.


Solution

  • Isn't it the case that you are not parsing the original args sent to the program? (i.e., args, beam_args = parser.parse_known_args()).

    That is an important step to make sure that the template is created as expected, instead of creating a separate job.

    Please check https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#python (and https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/flex-templates/streaming_beam/streaming_beam.py) for a complete example.