Search code examples
airflowgoogle-cloud-composergoogle-cloud-data-fusion

How to specify which GCP project to use when triggering a pipeline through Data Fusion operator on Cloud Composer


I need to to trigger a Data Fusion pipeline located on a GCP project called myDataFusionProject through a Data Fusion operator (CloudDataFusionStartPipelineOperator) inside a DAG whose Cloud Composer instance is located on another project called myCloudComposerProject.

I have used the official documentation as well as the source code to write the code that roughly resembles the below snippet:

LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    task_id="start_pipeline",
)

My issue is that, every time I trigger the DAG, Cloud Composer looks for myDataFusionInstance inside myCloudComposerProject instead of myDataFusionProject, which gives an error like this one:

googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"

So the question is: how can I force my operator to use the Data Fusion project instead of the Cloud Composer project? I suspect I may do that by adding a new runtime argument but I'm not sure how to do that.

Last piece of information: the Data Fusion pipeline simply extracts data from a BigQuery source and sends everything to a BigTable sink.


Solution

  • As a recommendation while developing operators on airflow, we should check the classes that are implementing the operators as documentation may lack some information due to versioning.

    As commented, if you check CloudDataFusionStartPipelineOperator you will find that it makes use of a hook that gets the instance base on a project-id. This project-id its optional, so you can add your own project-id.

    class CloudDataFusionStartPipelineOperator(BaseOperator):
     ...
    
        def __init__(
           ...
            project_id: Optional[str] = None,   ### NOT MENTION IN THE DOCUMENTATION 
            ...
        ) -> None:
            ...
            self.project_id = project_id 
            ...
    
        def execute(self, context: dict) -> str:
            ...
            instance = hook.get_instance(
                instance_name=self.instance_name,
                location=self.location,
                project_id=self.project_id, ### defaults your project-id
            )
            api_url = instance["apiEndpoint"]
            ... 
    

    Adding the parameter to your operator call should fix your issue.

    start_pipeline = CloudDataFusionStartPipelineOperator(
        location=LOCATION,
        pipeline_name=PIPELINE_NAME,
        instance_name=INSTANCE_NAME,
        runtime_args=RUNTIME_ARGS,
        project_id=PROJECT_ID,
        task_id="start_pipeline",
    )
    

    As a final note, besides the official documentation site you can also explore the files of apache airflow on github.