I'm making an airflow program that needs to convert .tsv to .parquet But I have an error:
ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/formatting_data.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'formatting_data', 'Task Id': 'format_imdb_data', 'Run Id': 'scheduled__2024-05-11T00:00:00+00:00', 'Hostname': 'dd0bbc46b6fa'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0xffff937ba850>, 'is_failure_callback': True}
and my DAG cuts itself in the middle
I made a class that manages my python files:
class FileHandler:
def convert_csv_to_parquet(self, csv_file):
df = pd.read_csv(csv_file)
parquet_file = csv_file.replace(".csv", ".parquet")
df.to_parquet(parquet_file)
return df
def convert_tsv_to_parquet(self, tsv_file):
df = pd.read_csv(tsv_file, sep='\t')
parquet_file = tsv_file.replace(".tsv", ".parquet")
df.to_parquet(parquet_file)
os.remove(tsv_file)
return df
def list_files_in_directory(self, directory, extension='.tsv'):
tsv_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(extension):
tsv_files.append(os.path.join(root, file))
return tsv_files
def remove_empty_directory(self, path):
for root, dirs, _ in os.walk(path, topdown=False):
for directory in dirs:
directory_path = os.path.join(root, directory)
if not os.listdir(directory_path):
os.rmdir(directory_path)
def move_files(self, files, destination, remove_dir_flags=True):
for file in files:
shutil.move(file, destination)
if remove_dir_flags:
self.remove_empty_directory(destination)
my DAG is :
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
import os
import sys
LIBS_PATH = os.path.join('/opt/airflow', 'libs')
if LIBS_PATH not in sys.path:
sys.path.insert(0, LIBS_PATH)
from preprocessing.formatting.file_handler import FileHandler
from utils.logs_manager import LogManager
from utils.path_manager import PathManager
file_handler = FileHandler()
lm = LogManager()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'formatting_data',
default_args=default_args,
description='DAG used to format data in datalake',
schedule_interval='@daily',
start_date=datetime(2024, 5, 7),
catchup=False,
tags=["formatting", "preprocessing"]
)
def format_imdb_files():
pm = PathManager('/opt/airflow/datalake')
imdb_path = pm.get_local_path('raw', 'imdb')
for file in os.listdir(imdb_path):
if file.endswith('.tsv'):
file_path = os.path.join(imdb_path, file)
df = pd.read_csv(file_path, sep='\t')
parquet_file = file_path.replace(".tsv", ".parquet")
df.to_parquet(parquet_file)
os.remove(file_path)
del df
format_imdb_data = PythonOperator(
task_id='format_imdb_data',
python_callable=format_imdb_files,
dag=dag,
)
format_imdb_data
I don't understand why I get this error and how to solve it Thanks a lot for your help !!
in airflow when the worker awaits for the operator to finish. If there’s a connection failure between worker and operator or worker kills off unexpectedly, while the operator is still running, it becomes a zombie process.
i think in your case the format_imdb_files()
is taking too much time to complete, you must increase the scheduler configuration scheduler_zombie_task_threshold
from default 5 minutes to somewhere around ~20–30 minutes ( depending on the process time).