Search code examples
pythonairflowgoogle-cloud-dataflowapache-beamgoogle-cloud-composer

Set Airflow Variable dynamically


Hi community I need for help. I have a GCS bucket called "staging". This bucket contain folders and subfolders (see picture). enter image description here

The "date-folders" (eg. 20221128) may be several. Each date-folder has 3 subfolders: I'm interested in the "main_folder". The main_folder has 2 "project folders". Each project folder has several subfolders. Each of these last subfolder has a .txt file.

The main objective is:

  1. Obtain a list of all the path to .txt files (eg. gs://staging/20221128/main_folder/project_1/subfold_1/file.txt, ...)
  2. Export the list on an Airflow Variable
  3. Use the "list" Variable to run some DAGS dynamically.

The folders in the staging bucket may vary everyday, so I don't have static paths. I'm using Apache Beam with Python SDK on Cloud Dataflow and Airflow with Cloud Composer.

Is there a way to obtain the list of paths (as os.listdir() on python) with Beam and schedule this workflow daily? (I need to override the list Variable eveyday with new paths).

For example I can achieve step n.1 (locally) with the following Python script:

def collect_paths(startpath="C:/Users/PycharmProjects/staging/"):
list_paths = []
for path, dirs, files in os.walk(startpath):
    for f in files:
        file_path = path + "/" + f
        list_paths .append(file_path )
return list_paths

Thank you all in advance.

Edit n.1.

I've retrieved file paths thanks to google.cloud.storage API in my collect_paths script. Now, I want to access to XCom and get the list of paths. This is my task instance:

    collect_paths_job = PythonOperator(
       task_id='collect_paths',
       python_callable=collect_paths,
       op_kwargs={'bucket_name': 'staging'},
       do_xcom_push=True
    )

I want to iterate over the list in order to run (in the same DAG) N concurrent task, each processing a single file. I tried with:

files = collect_paths_job.xcom_pull(task_ids='collect_paths', include_prior_dates=False)
for f in files:
    job = get_job_operator(f)
    chain(job)

But got the following error:

TypeError: xcom_pull() missing 1 required positional argument: 'context'

Solution

  • I would like to correct you in your usage of the term Variable . Airflow attributes a special meaning to this object. What you want is for the file info to be accessible as parameters in a task.

    Use XCom

    Assume you have the DAG with the python task called -- list_files_from_gcs. This task is a python task which exactly runs the collect_path function that you have written. Since this function returns a list, airflow automatically stuffs this into XCom. So now you can access this information anywhere in your DAG in subsequent tasks.

    1. Now your subsequent task can again be a python task in the same DAG which case you can access XCom very very easily:

      @task
      def next_task(xyz, abc, **context):
          ti = context['ti']
          files_list = ti.xcom_pull(task_ids='list_files_from_gcs')
          ...
          ...
      
    2. If you are now looking to call an entirely different DAG now, then you can use TriggerDagRunOperator as well and pass this list as dag_run config like this:

      TriggerDagRunOperator(
          conf={
              "gcs_files_list": "{{task_instance.xcom_pull(task_ids='list_files_from_gcs'}}"
          },
          ....
          ....
      )
      

      Then your triggered DAG can just parse the DAG run config to move ahead.