I'm new to airflow and I'm trying to run a job on an ec2 instance using airflow's ssh_operator like shown below:
t2 = SSHOperator(
ssh_conn_id='ec2_ssh_connection',
task_id='execute_script',
command="nohup python test.py &",
retries=3,
dag=dag)
The job takes few hours and I want airflow to execute the python script and end. However when the command is executed and the dag completes the script is terminated on the ec2 instance. I also noticed that the above code doesn't create a nohup.out file.
I'm looking at how to run nohup using SSHOperator. It seems like this might be a python related issue because I'm getting the following error on EC2 script when the nohup has been executed:
[Errno 32] Broken pipe
Thanks!
Airflow's SSHHook
uses the Paramiko module for SSH connectivity. There is an SO question regarding Prarmiko and nohup
. One of the answers suggests to add sleep
after the nohup
command. I cannot explain exactly why, but it actually works. It is also necessary to set get_pty=True
in SSHOperator
.
Here is a complete example that demonstrates the solution:
from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'start_date': datetime(2001, 2, 3, 4, 0),
}
with DAG(
'a_dag', schedule_interval=None, default_args=default_args, catchup=False,
) as dag:
op = SSHOperator(
task_id='ssh',
ssh_conn_id='ssh_default',
command=(
'nohup python -c "import time;time.sleep(30);print(1)" & sleep 10'
),
get_pty=True, # This is needed!
)
The nohup.out
file is written to the user's $HOME
.