If I want to execute below shell commands from Python (through API call) , how can I do that ?
AUTH_TOKEN=$(gcloud auth print-access-token)
CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
--location=${CDF_REGION} \
--format="value(apiEndpoint)" \
${CDF_INSTANCE_ID})
WF_URL="${CDAP_ENDPOINT}/v3/namespaces/default/apps"
WF_URL=${WF_URL}/${CDF_PIPELINE}/workflows/DataPipelineWorkflow
RUN_IDS=`curl -s -X GET \
-H "Authorization: Bearer ${AUTH_TOKEN}" \
"${WF_URL}/runs" \
| jq -r '.[].runid'`
RUN_DETAILS=`curl -s -X GET \
-H "Authorization: Bearer ${AUTH_TOKEN}" \
"${WF_URL}/runs/${RUN_ID}" \
| jq -r '.runid, .starting, .start, .end, .status'`
Regards, Santanu
See code implementation below for your use case.
import requests
from google.cloud import data_fusion_v1
import google.auth
import google.auth.transport.requests
def get_endpoint(project_id,location,instance_name):
client = data_fusion_v1.DataFusionClient()
name = f"projects/{project_id}/locations/{location}/instances/{instance_name}"
request = data_fusion_v1.GetInstanceRequest(name=name)
response = client.get_instance(request=request)
api_endpoint = response.api_endpoint
pipeline_name="DataFusionQuickstart" # change value to actual pipeline name
cdap_endpoint = f"{api_endpoint}/v3/namespaces/default/apps/{pipeline_name}/workflows/DataPipelineWorkflow/runs"
return cdap_endpoint
def auth_gcp():
creds, project = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
token = creds.token
return token
def get_run_ids(api_endpoint,token):
headers={"Authorization": f"Bearer {token}"}
runs_response = requests.get(url=api_endpoint,headers=headers)
runs_json = runs_response.json()
run_ids = [data["runid"] for data in runs_json]
return run_ids
def get_run_details(api_endpoint,run_ids,token):
headers={"Authorization": f"Bearer {token}"}
for run_id in run_ids:
run_api_endpoint = api_endpoint + "/" + run_id
response = requests.get(url=run_api_endpoint,headers=headers)
runs_json = response.json()
print("run_id : " + runs_json["runid"])
print("starting: " + str(runs_json["starting"]))
print("start: " + str(runs_json["start"]))
print("end: " + str(runs_json["end"]))
print("status: " + runs_json["status"])
print("=========")
api_endpoint=get_endpoint(project_id="your_project_id",location="us-central1",instance_name="your_instance_name")
token=auth_gcp()
run_ids=get_run_ids(api_endpoint=api_endpoint,token=token)
get_run_details(api_endpoint=api_endpoint,run_ids=run_ids,token=token)
Test run: