Search code examples
pythonshellgoogle-cloud-platformgoogle-cloud-data-fusioncdap

Data Fusion Endpoint API call from Python


If I want to execute below shell commands from Python (through API call) , how can I do that ?

AUTH_TOKEN=$(gcloud auth print-access-token)

CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
    --location=${CDF_REGION} \
    --format="value(apiEndpoint)" \
    ${CDF_INSTANCE_ID})

WF_URL="${CDAP_ENDPOINT}/v3/namespaces/default/apps"

WF_URL=${WF_URL}/${CDF_PIPELINE}/workflows/DataPipelineWorkflow

RUN_IDS=`curl -s -X GET \
    -H "Authorization: Bearer ${AUTH_TOKEN}" \
    "${WF_URL}/runs" \
    | jq -r '.[].runid'`

RUN_DETAILS=`curl -s -X GET \
        -H "Authorization: Bearer ${AUTH_TOKEN}" \
        "${WF_URL}/runs/${RUN_ID}" \
        | jq -r '.runid, .starting, .start, .end, .status'`

Regards, Santanu


Solution

  • See code implementation below for your use case.

    import requests
    from google.cloud import data_fusion_v1
    
    import google.auth
    import google.auth.transport.requests
    
    def get_endpoint(project_id,location,instance_name):
    
        client = data_fusion_v1.DataFusionClient()
        name = f"projects/{project_id}/locations/{location}/instances/{instance_name}"
        request = data_fusion_v1.GetInstanceRequest(name=name)
        response = client.get_instance(request=request)
        api_endpoint = response.api_endpoint
        pipeline_name="DataFusionQuickstart" # change value to actual pipeline name
    
        cdap_endpoint = f"{api_endpoint}/v3/namespaces/default/apps/{pipeline_name}/workflows/DataPipelineWorkflow/runs"
    
        return cdap_endpoint
    
    def auth_gcp():
        creds, project = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
        auth_req = google.auth.transport.requests.Request()
        creds.refresh(auth_req)
        token = creds.token
    
        return token
    
    def get_run_ids(api_endpoint,token):
        headers={"Authorization": f"Bearer {token}"}
    
        runs_response = requests.get(url=api_endpoint,headers=headers)
        runs_json = runs_response.json()
        run_ids = [data["runid"] for data in runs_json]
    
        return run_ids
    
    def get_run_details(api_endpoint,run_ids,token):
        headers={"Authorization": f"Bearer {token}"}
    
        for run_id in run_ids:
            run_api_endpoint = api_endpoint + "/" + run_id
            response = requests.get(url=run_api_endpoint,headers=headers)
            runs_json = response.json()
    
            print("run_id : " + runs_json["runid"])
            print("starting: " + str(runs_json["starting"]))
            print("start: " + str(runs_json["start"]))
            print("end: " + str(runs_json["end"]))
            print("status: " + runs_json["status"])
            print("=========")
    
    
    api_endpoint=get_endpoint(project_id="your_project_id",location="us-central1",instance_name="your_instance_name")
    token=auth_gcp()
    run_ids=get_run_ids(api_endpoint=api_endpoint,token=token)
    get_run_details(api_endpoint=api_endpoint,run_ids=run_ids,token=token)
    

    Test run:

    enter image description here