I am trying to check the state of a Data Fusion pipeline with Cloud Composer. In the DAG, I have the following code, which is a copy from Airflow website:
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="start_pipeline_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_task.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION)
However, when I try to run the DAG in Cloud Composer, I get error invalid argument error for failure_statuses. The argument is also in the source code here.
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
result = func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to CloudDataFusionPipelineStateSensor (task_id: start_pipeline_sensor). Invalid arguments were:
**kwargs: {'failure_statuses': ['FAILED']}
What could cause this? It is working fine without the failure_statuses argument.
The failure_statuses
parameter for the CloudDataFusionPipelineStateSensor
wasn't introduced until v6.0.0 of the Google provider in Airflow. The example DAG reflects the provider with this version. Try upgrading to the latest Google provider and the example should work.
Be aware that there were some breaking changes between v5.1.0 to v6.0.0 of the provider.
Side note on looking at source code in Airflow. As of Airflow 2, releases of core Airflow and functionality related to service providers (e.g. hooks, operators, sensors for Google, Databricks, etc.) have been decoupled. This means that provider functionality can be released independent from core Airflow. Providers are released typically monthly. The main
branch in Airflow reflects the latest code base but that does not mean it reflects the latest available code. To make sure you are looking at the correct code for the provider version you have installed, use tags when searching through the source code: