Search code examples
pythonairflowetlairflow-taskflow

Airflow Sensor incomplete files list


I'm really curious to know, how airflow sensors work when a file is not completely copied/loaded to a target data storage. As an example: we have a filesystem, and sensor checks files within it. We are copying a large file to folder and it really takes some time. Will airflow sensor consume incomplete file or it will wait for the file to be fully loaded?

I'm really looking for an answer and haven't found anything similar


Solution

  • you can see inside the code of FileSensor the answer.

    The answer is that the operation system is responsible for the answer. in a test I made while downloading a big file "Mac" for example returned True while downloading the file

    the idea is while you copy a big file give it a temporary extension and replace it to the real one after copy finished

     def poke(self, context: Context):
        hook = FSHook(self.fs_conn_id)
        basepath = hook.get_path()
        full_path = os.path.join(basepath, self.filepath)
        self.log.info("Poking for file %s", full_path)
    
        for path in glob(full_path, recursive=self.recursive):
            if os.path.isfile(path):
                mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
                self.log.info("Found File %s last modified: %s", str(path), mod_time)
                return True
    
            for _, _, files in os.walk(path):
                if len(files) > 0:
                    return True
        return False