Search code examples
bashgoogle-cloud-platformairflowgoogle-cloud-composer

Referencing external file in Shell script using BashOperator


I have a shell script generate_rpt.sh that reads data from a csv file and performs certain operations as shown below

#!/bin/bash
src=$1
dst=$2
file_name=$3

while IFS='' read -r line || [[ -n "$line" ]]; do
    line=${line//$';'/,}
    IFS=',' read -r -a columns <<< "$line"
    # do something with each row
done < "${3}"

I have a BashOperator that calls the above shell script defined as below

execute_cmd = BashOperator(
    task_id="execute_cmd",
    provide_context=True,
    bash_command="scripts/generate_rpt.sh",
    params={'src':'src_prj','dst':'dst_prj','file_name':'rpt_config.csv'}
)

The csv file rpt_config.csv and the shell script execute_cmd.sh are in the scripts folder.

DAG Folder location - /home/airflow/gcs/dags/sus

Scripts folder location - /home/airflow/gcs/dags/sus/scripts

I keep getting the error rpt_config.csv file not found. I have tried giving full file path i.e. /home/airflow/gcs/dags/sus/scripts/rpt_config.csv but even then I get the same error.

When I execute the shell script via gcloud shell, it is working successfully without any errors but when I am executing via DAG it's failing. Can someone help me to understand how to handle this?

Airflow version - composer-1.16.6-airflow-1.10.15

Python version 2


Solution

  • I was able to make your DAG and bash script work by defining the full path for both generate_rpt.sh and rpt_config.csv. I also followed your folder structure, see location of scripts below:

    DAG location: enter image description here

    Scripts location:

    enter image description here

    In my DAG I used the environment variable DAGS_FOLDER to get the path of the dags folder and used it to provide the full path of the files. I also used bash to call the script and pass the parameters on the command itself. See DAG below:

    import datetime
    
    from airflow import models
    from airflow.operators import bash
    import os
    
    DAGS_FOLDER = os.environ["DAGS_FOLDER"]
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': YESTERDAY,
    }
    
    with models.DAG(
            'run_bash',
            catchup=False,
            default_args=default_args,
            schedule_interval=datetime.timedelta(days=1)) as dag:
    
        execute_cmd = bash.BashOperator(
            task_id='execute_cmd', bash_command=f'bash {DAGS_FOLDER}/sus/scripts/generate_rpt.sh src_prj dst_prj {DAGS_FOLDER}/sus/scripts/rpt_config.csv ',)
    
        execute_cmd
    

    Output:

    enter image description here

    NOTE: I just updated your generate_rpt.sh to print the lines in the csv file hence the output above. I'm using Composer version 1.19.5 and Airflow version 2.2.5