I already have a working pipeline in Data Fusion that makes all ETL proccess but I need it to run only when it finds a file called SUCCESS.txt located in a Cloud Storage bucket.
Is this even possible?
On other platforms I used a file watcher (Every minute runs a job to verify if the file I specified exists on certain locatoin, if the file is there, it execute other jobs) but I can't find something similar.
Thanks a lot in advance!
You can achieve this by using Cloud Functions GCS triggers with a condition to call the Data Fusion API to start your pipeline only when the uploaded file is SUCCESS.txt
.
Note that whether it calls the Data Fusion API or not, the function will trigger on every file upload.
run_pipeline
) and add your python script (or the example below) in main.py
.import requests
import json
import os
def get_access_token():
# scope of the API access. Limit it to just cloud platform services
scopes='https://www.googleapis.com/auth/cloud-platform'
headers={'Metadata-Flavor': 'Google'}
# add the scopes
api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
# api call to get the access token
r = requests.get(api,headers=headers).json()
# return the access token
return r['access_token']
def run_pipeline(data, context):
'''
Calls the Data Fusion API to start the pipeline
'''
# get environmental variables set in the inital configuraiton.
PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
# get uploaded file name
file_name = data['name']
# get access token
auth_token=get_access_token()
# api call full endpoint
post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
# If the pipeline has any macros that need to be set, you can pass them in as a payload
data = '{"my-file":' + file_name +'}'
# add bearer token to the header
post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
# condition to start the job:
if file_name == 'SUCCESS.txt':
# start the job
r1 = requests.post(post_endpoint,data=data,headers=post_headers)
SUCCESS.txt
file or any other file.I have tested it and it works fine (Based on this post).