Search code examples
pythonairflow

Is there a way to remove zombies in airflow


I'm making an airflow program that needs to convert .tsv to .parquet But I have an error:

ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/formatting_data.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'formatting_data', 'Task Id': 'format_imdb_data', 'Run Id': 'scheduled__2024-05-11T00:00:00+00:00', 'Hostname': 'dd0bbc46b6fa'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0xffff937ba850>, 'is_failure_callback': True}

and my DAG cuts itself in the middle

I made a class that manages my python files:

class FileHandler:
    def convert_csv_to_parquet(self, csv_file):
        df = pd.read_csv(csv_file)
        parquet_file = csv_file.replace(".csv", ".parquet")
        df.to_parquet(parquet_file)
        return df

    def convert_tsv_to_parquet(self, tsv_file):
        df = pd.read_csv(tsv_file, sep='\t')
        parquet_file = tsv_file.replace(".tsv", ".parquet")
        df.to_parquet(parquet_file)
        os.remove(tsv_file)
        return df

    def list_files_in_directory(self, directory, extension='.tsv'):
        tsv_files = []
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith(extension):
                    tsv_files.append(os.path.join(root, file))
        return tsv_files

    def remove_empty_directory(self, path):
        for root, dirs, _ in os.walk(path, topdown=False):
            for directory in dirs:
                directory_path = os.path.join(root, directory)
                if not os.listdir(directory_path):
                    os.rmdir(directory_path)

    def move_files(self, files, destination, remove_dir_flags=True):
        for file in files:
            shutil.move(file, destination)

        if remove_dir_flags:
            self.remove_empty_directory(destination)

my DAG is :

from datetime import datetime, timedelta

import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
import os
import sys

LIBS_PATH = os.path.join('/opt/airflow', 'libs')
if LIBS_PATH not in sys.path:
    sys.path.insert(0, LIBS_PATH)


from preprocessing.formatting.file_handler import FileHandler
from utils.logs_manager import LogManager
from utils.path_manager import PathManager

file_handler = FileHandler()
lm = LogManager()

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'formatting_data',
    default_args=default_args,
    description='DAG used to format data in datalake',
    schedule_interval='@daily',
    start_date=datetime(2024, 5, 7),
    catchup=False,
    tags=["formatting", "preprocessing"]
)


def format_imdb_files():
    pm = PathManager('/opt/airflow/datalake')
    imdb_path = pm.get_local_path('raw', 'imdb')
    for file in os.listdir(imdb_path):
        if file.endswith('.tsv'):
            file_path = os.path.join(imdb_path, file)
            df = pd.read_csv(file_path, sep='\t')
            parquet_file = file_path.replace(".tsv", ".parquet")
            df.to_parquet(parquet_file)
            os.remove(file_path)
            del df

format_imdb_data = PythonOperator(
    task_id='format_imdb_data',
    python_callable=format_imdb_files,
    dag=dag,
)

format_imdb_data

I don't understand why I get this error and how to solve it Thanks a lot for your help !!


Solution

  • in airflow when the worker awaits for the operator to finish. If there’s a connection failure between worker and operator or worker kills off unexpectedly, while the operator is still running, it becomes a zombie process.

    i think in your case the format_imdb_files() is taking too much time to complete, you must increase the scheduler configuration scheduler_zombie_task_threshold from default 5 minutes to somewhere around ~20–30 minutes ( depending on the process time).