Search code examples
google-cloud-platformetlgoogle-cloud-dataprocgoogle-cloud-data-fusion

Run a Data Fusion pipeline only when a file exist


I already have a working pipeline in Data Fusion that makes all ETL proccess but I need it to run only when it finds a file called SUCCESS.txt located in a Cloud Storage bucket.
Is this even possible?
On other platforms I used a file watcher (Every minute runs a job to verify if the file I specified exists on certain locatoin, if the file is there, it execute other jobs) but I can't find something similar.
Thanks a lot in advance!


Solution

  • You can achieve this by using Cloud Functions GCS triggers with a condition to call the Data Fusion API to start your pipeline only when the uploaded file is SUCCESS.txt.
    Note that whether it calls the Data Fusion API or not, the function will trigger on every file upload.

    When you create the Cloud Function:

    1. Choose the Cloud Storage trigger type and the Finalize/Create event type.

    enter image description here

    2. Add the environment variables with your own values and click next.

    enter image description here

    3. Set the runtime to python 3.7, the name of the python function in the entry point (In this case ,run_pipeline) and add your python script (or the example below) in main.py.

    enter image description here

    import requests
    import json
    import os
    
    def get_access_token():
    
        # scope of the API access. Limit it to just cloud platform services
        scopes='https://www.googleapis.com/auth/cloud-platform'
        headers={'Metadata-Flavor': 'Google'}
    
        # add the scopes
        api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
    
        # api call to get the access token
        r = requests.get(api,headers=headers).json()
    
        # return the access token
        return r['access_token']
    
    def run_pipeline(data, context):
        '''
        Calls the Data Fusion API to start the pipeline
        '''
        
        # get environmental variables set in the inital configuraiton.
        PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
        TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
        PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
        INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
        REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
        NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
        CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
    
        # get uploaded file name
        file_name = data['name']
        
        # get access token
        auth_token=get_access_token()
        
        # api call full endpoint
        post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
        
        # If the pipeline has any macros that need to be set, you can pass them in as a payload
        data = '{"my-file":' + file_name +'}'
        
        # add bearer token to the header
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        
        # condition to start the job:
        if file_name == 'SUCCESS.txt':
            # start the job
            r1 = requests.post(post_endpoint,data=data,headers=post_headers)
    

    4. Deploy your function and when it is ready, test it by uploading your SUCCESS.txt file or any other file.

    I have tested it and it works fine (Based on this post).