Search code examples
airflowairflow-2.x

Airflow 2.7.1: SSHOperator FileNotFoundError


I've set up Airflow on docker based on their docs, and I'm attempting to test one of my SSH connections via this code:

from airflow import DAG
from airflow.decorators import dag
from airflow.providers.ssh.operators.ssh import SSHOperator

from datetime import datetime, timezone

description = "used to validate/debug ssh connections"
tags = ["debug", "joe"]
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
schedule = '0 0 * * *'
catchup = False

@dag(description=description, tags=tags, start_date=start_date, schedule=schedule, catchup=catchup)
def debug_ssh_conn_dag():
    
    # Used to validate the connection
    validate_conn = SSHOperator(
        task_id="validate_conn",
        ssh_conn_id="ssh_test",
        command="echo 'hello world'"
    )
    
    validate_conn

debug_ssh_conn_dag()

My connection (redacted some info) enter image description here.

However, I'm consistently getting this error: FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'

I've tried using SSHHooks as well but to no avail. I've attempted SSH directly from my Airflow VM to the target VM, and it worked fine.

Is there something I'm doing wrong? I appreciate all the help I can get.

Full Logs:

*** Found local files:
***   * /opt/airflow/logs/dag_id=debug_ssh_conn_dag/run_id=manual__2023-10-06T06:41:18.437511+00:00/task_id=validate_conn/attempt=1.log
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [queued]>
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [queued]>
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1380} INFO - Executing <Task(SSHOperator): validate_conn> on 2023-10-06 06:41:18.437511+00:00
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:57} INFO - Started process 247141 to run task
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'debug_ssh_conn_dag', 'validate_conn', 'manual__2023-10-06T06:41:18.437511+00:00', '--job-id', '162', '--raw', '--subdir', 'DAGS_FOLDER/debug_ssh_conn_dag.py', '--cfg-path', '/tmp/tmp207d3qe9']
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:85} INFO - Job 162: Subtask validate_conn
[2023-10-06, 06:41:18 UTC] {task_command.py:415} INFO - Running <TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [running]> on host 639c4d0c8698
[2023-10-06, 06:41:19 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='debug_ssh_conn_dag' AIRFLOW_CTX_TASK_ID='validate_conn' AIRFLOW_CTX_EXECUTION_DATE='2023-10-06T06:41:18.437511+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-10-06T06:41:18.437511+00:00'
[2023-10-06, 06:41:19 UTC] {ssh.py:135} INFO - Creating ssh_client
[2023-10-06, 06:41:19 UTC] {ssh.py:112} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2023-10-06, 06:41:19 UTC] {base.py:73} INFO - Using connection ID 'ssh_pandai_ai' for task execution.
[2023-10-06, 06:41:19 UTC] {ssh.py:300} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2023-10-06, 06:41:19 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:19 UTC] {ssh.py:341} INFO - Failed to connect. Sleeping before retry attempt 1
[2023-10-06, 06:41:23 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:23 UTC] {ssh.py:341} INFO - Failed to connect. Sleeping before retry attempt 2
[2023-10-06, 06:41:28 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:28 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 172, in execute
    with self.get_ssh_client() as ssh_client:
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 136, in get_ssh_client
    return self.get_hook().get_conn()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 345, in get_conn
    for attempt in Retrying(
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 347, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 352, in get_conn
    client.connect(**connect_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 485, in connect
    self._auth(
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 730, in _auth
    key = self._key_from_filepath(
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 638, in _key_from_filepath
    key = klass.from_private_key_file(key_path, password)
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/pkey.py", line 421, in from_private_key_file
    key = cls(filename=filename, password=password)
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/rsakey.py", line 64, in __init__
    self._from_private_key_file(filename, password)
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/rsakey.py", line 196, in _from_private_key_file
    data = self._read_private_key_file("RSA", filename, password)
  File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/pkey.py", line 494, in _read_private_key_file
    with open(filename, "r") as f:
FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'
[2023-10-06, 06:41:28 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=debug_ssh_conn_dag, task_id=validate_conn, execution_date=20231006T064118, start_date=20231006T064118, end_date=20231006T064128
[2023-10-06, 06:41:28 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 162 for task validate_conn ([Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'; 247141)
[2023-10-06, 06:41:28 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-10-06, 06:41:28 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check

Attempted to test SSH connection via SSHOperator (validated key by SSH-ing directly from airflow vm to target vm), but got FileNotFoundError.


Solution

  • I've resolved this by mounting

    /home/ubuntu/.ssh:/home/ubuntu/.ssh
    

    onto the volume of x-airflow-common.

    I've gotten an additional error after this: ValueError: q must be exactly 160, 224, or 256 bits long

    And was resolved by adding a username to the Airflow SSH Connection.