Search code examples
pythonairflowparameter-passingglobfile-watcher

Getting the path to the file located by FileSensor


I am building a DAG that waits for filesystem changes then runs some analysis on newly appearing or modified files, for which I'm using a FileSensor. The path definition I'm monitoring contains both a jinja template and either a wildcard or a glob. When a file is found, I would like to provide its absolute path to the subsequent callbacks and tasks. Then, the file's metadata will be compared against some data structure to determine whether it needs to be processed.

The problem: how to "exfiltrate" the found file's path from the sensor? I looked at the source code of FileSensor, but it only logs the found file, without storing the path anywhere. Is there any way to use the path template and/or the context to reconstruct the path without doing additional filesystem queries?

I thought of a couple of workarounds, but before I go down either path, I wanted to make sure that there's a good reason for it. My ideas are:

  1. Pass the data path template as-is to the subsequent task(s) and hope it works automagically if used in a PythonOperator.
  2. Force the rendering of the jinja template through its context/environment or by using BashOperator + echo, then re-query the filesystem.

Here's a simplified outline of my configuration:

# <DAG initialization code>
    ...
    path_template: str = os.path.join(
        "/basepath/",
        "{{ data_interval_start.format('YYYYMMDD') }}",
        f"{source}.{{{{ data_interval_start.format('YYYYMMDD') }}}}*.csv.gz"
    )
    fs: FileSensor = FileSensor(
        task_id=f"{source}_data_sensor",
        filepath=path_template,
        poke_interval=int(duration(minutes=5).total_seconds()),
        timeout=int(duration(hours=1, minutes=30).total_seconds()),
        mode="reschedule",
        pre_execute=log_execution,
        on_success_callback=partial(log_found_file, path_template),
    )
    fs >> convert(source) >> analyze_data(source)

Where log_found_file is given below:

def log_found_file(data_path_template: str, ctx: Context) -> None:
    """Logs the discovery of a data file."""
    data_path = f(data_path_template, ctx)  # <<<<<<<<<<<<<<< Need help with this
    stats: os.stat_result = os.stat(data_path)
    logger.success(
        f"Detected data file {data_path} "
        f"of size {stats.st_size}; "
        f"created on {stats.st_ctime}; "
        f"and last modified on {stats.st_mtime}."
    )

I'm working with Airflow 2.8.0 in case it matters.


Solution

  • Below is an implementation of one of the workarounds mentioned in the question:

    def log_found_file(data_path_template: str, ctx: Context) -> None:
        """Logs the discovery of a data file."""
        glob_str: str = ctx.get("task").render_template(data_path_template, ctx)
        data_path: str = glob(glob_str)[0]
        ...