Search code examples
google-cloud-platformgoogle-cloud-dataflowairflowdataflowairflow-scheduler

Google Cloud DataFlow job not available yet.. in Airflow


I am running Dataflow Job from Airflow. I need to say that I am a newie to Airflow. The dataflow (run from Airflow) is running successfully, but I can see that Airflow has some problem with getting job status and I receive infinitely message like:

Google Cloud DataFlow job not available yet..

Here are logs just after adding all steps to dataflow (I put {projectID} and {jobID} in places where it was):

[2018-10-01 13:00:13,987] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,987] {gcp_dataflow_hook.py:128} WARNING - b'INFO: Staging pipeline description to gs://my-project/staging'

[2018-10-01 13:00:13,987] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,987] {gcp_dataflow_hook.py:128} WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'

[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-10-01_06_00_12-{jobID}?project={projectID}'

[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'

[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b"INFO: To cancel the job using the 'gcloud' tool, run:"

[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'> gcloud dataflow jobs --project={projectID} cancel --region=us-central1 2018-10-01_06_00_12-{jobID}'

[2018-10-01 13:00:13,990] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,990] {discovery.py:267} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest

[2018-10-01 13:00:14,417] {logging_mixin.py:95} INFO - [2018-10-01 13:00:14,417] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:14,593] {logging_mixin.py:95} INFO - [2018-10-01 13:00:14,593] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..

[2018-10-01 13:00:29,614] {logging_mixin.py:95} INFO - [2018-10-01 13:00:29,614] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:29,772] {logging_mixin.py:95} INFO - [2018-10-01 13:00:29,772] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..

[2018-10-01 13:00:44,790] {logging_mixin.py:95} INFO - [2018-10-01 13:00:44,790] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json

[2018-10-01 13:00:44,937] {logging_mixin.py:95} INFO - [2018-10-01 13:00:44,937] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..

Do you know what can cause this? I couldn't find any solution related to this issue. Should I provide more information?

Here is my task in DAG:

# dataflow task
dataflow_t=DataFlowJavaOperator(
task_id='mydataflow',
jar='/lib/dataflow_test.jar',
gcp_conn_id='my_gcp_conn',
delegate_to='{service_account}@{projectID}.iam.gserviceaccount.com',
dag=dag)

and options connected to dataflow in DAG in the default_args:

'dataflow_default_options': {
     'project': '{projectID}',
     'stagingLocation': 'gs://my-project/staging'
    }

Solution

  • I faced the same issue. I created Job name in DataflowPipelineOptions. Airflow also creates job name on the basis of task-id you provide.

    So there is conflict and airflow is not able to find the actual job name which 
    you created via DataflowPipelineOptions.
    

    You should just remove job name from DataflowPipelineOptions and it will work.