I've set up Airflow on docker based on their docs, and I'm attempting to test one of my SSH connections via this code:
from airflow import DAG
from airflow.decorators import dag
from airflow.providers.ssh.operators.ssh import SSHOperator
from datetime import datetime, timezone
description = "used to validate/debug ssh connections"
tags = ["debug", "joe"]
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)
schedule = '0 0 * * *'
catchup = False
@dag(description=description, tags=tags, start_date=start_date, schedule=schedule, catchup=catchup)
def debug_ssh_conn_dag():
# Used to validate the connection
validate_conn = SSHOperator(
task_id="validate_conn",
ssh_conn_id="ssh_test",
command="echo 'hello world'"
)
validate_conn
debug_ssh_conn_dag()
My connection (redacted some info) enter image description here.
However, I'm consistently getting this error:
FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'
I've tried using SSHHooks as well but to no avail. I've attempted SSH directly from my Airflow VM to the target VM, and it worked fine.
Is there something I'm doing wrong? I appreciate all the help I can get.
Full Logs:
*** Found local files:
*** * /opt/airflow/logs/dag_id=debug_ssh_conn_dag/run_id=manual__2023-10-06T06:41:18.437511+00:00/task_id=validate_conn/attempt=1.log
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [queued]>
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [queued]>
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2023-10-06, 06:41:18 UTC] {taskinstance.py:1380} INFO - Executing <Task(SSHOperator): validate_conn> on 2023-10-06 06:41:18.437511+00:00
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:57} INFO - Started process 247141 to run task
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'debug_ssh_conn_dag', 'validate_conn', 'manual__2023-10-06T06:41:18.437511+00:00', '--job-id', '162', '--raw', '--subdir', 'DAGS_FOLDER/debug_ssh_conn_dag.py', '--cfg-path', '/tmp/tmp207d3qe9']
[2023-10-06, 06:41:18 UTC] {standard_task_runner.py:85} INFO - Job 162: Subtask validate_conn
[2023-10-06, 06:41:18 UTC] {task_command.py:415} INFO - Running <TaskInstance: debug_ssh_conn_dag.validate_conn manual__2023-10-06T06:41:18.437511+00:00 [running]> on host 639c4d0c8698
[2023-10-06, 06:41:19 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='debug_ssh_conn_dag' AIRFLOW_CTX_TASK_ID='validate_conn' AIRFLOW_CTX_EXECUTION_DATE='2023-10-06T06:41:18.437511+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-10-06T06:41:18.437511+00:00'
[2023-10-06, 06:41:19 UTC] {ssh.py:135} INFO - Creating ssh_client
[2023-10-06, 06:41:19 UTC] {ssh.py:112} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2023-10-06, 06:41:19 UTC] {base.py:73} INFO - Using connection ID 'ssh_pandai_ai' for task execution.
[2023-10-06, 06:41:19 UTC] {ssh.py:300} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
[2023-10-06, 06:41:19 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:19 UTC] {ssh.py:341} INFO - Failed to connect. Sleeping before retry attempt 1
[2023-10-06, 06:41:23 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:23 UTC] {ssh.py:341} INFO - Failed to connect. Sleeping before retry attempt 2
[2023-10-06, 06:41:28 UTC] {transport.py:1893} INFO - Connected (version 2.0, client OpenSSH_8.2p1)
[2023-10-06, 06:41:28 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 172, in execute
with self.get_ssh_client() as ssh_client:
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 136, in get_ssh_client
return self.get_hook().get_conn()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 345, in get_conn
for attempt in Retrying(
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 347, in __iter__
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 325, in iter
raise retry_exc.reraise()
File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 158, in reraise
raise self.last_attempt.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/ssh/hooks/ssh.py", line 352, in get_conn
client.connect(**connect_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 485, in connect
self._auth(
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 730, in _auth
key = self._key_from_filepath(
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/client.py", line 638, in _key_from_filepath
key = klass.from_private_key_file(key_path, password)
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/pkey.py", line 421, in from_private_key_file
key = cls(filename=filename, password=password)
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/rsakey.py", line 64, in __init__
self._from_private_key_file(filename, password)
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/rsakey.py", line 196, in _from_private_key_file
data = self._read_private_key_file("RSA", filename, password)
File "/home/airflow/.local/lib/python3.8/site-packages/paramiko/pkey.py", line 494, in _read_private_key_file
with open(filename, "r") as f:
FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'
[2023-10-06, 06:41:28 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=debug_ssh_conn_dag, task_id=validate_conn, execution_date=20231006T064118, start_date=20231006T064118, end_date=20231006T064128
[2023-10-06, 06:41:28 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 162 for task validate_conn ([Errno 2] No such file or directory: '/home/ubuntu/.ssh/id_rsa'; 247141)
[2023-10-06, 06:41:28 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-10-06, 06:41:28 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check
Attempted to test SSH connection via SSHOperator (validated key by SSH-ing directly from airflow vm to target vm), but got FileNotFoundError.
I've resolved this by mounting
/home/ubuntu/.ssh:/home/ubuntu/.ssh
onto the volume of x-airflow-common
.
I've gotten an additional error after this:
ValueError: q must be exactly 160, 224, or 256 bits long
And was resolved by adding a username to the Airflow SSH Connection.