I need to upload a file via SFTP into an external server through Cloud Composer. The code for the task is as follows:
from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
import pysftp
import os
from airflow.contrib.hooks.ssh_hook import SSHHook
import subprocess
ssh_hook = SSHHook(ssh_conn_id="conn_id")
sftp_client = ssh_hook.get_conn().open_sftp()
return 0
etl_dag = DAG("dag_test",
start_date=datetime.now(tz=local_tz),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
In "conn_id" I have used the following options: {"no_host_key_check": "true"}, the DAG runs for a couple of seconds and the fail with the following message:
WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks\n[2022-02-10 10:01:59,358] {ssh_hook.py:171} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks\nTraceback (most recent call last):\n File "/tmp/venvur4zvddz/script.py", line 23, in <module>\n res = make_sftp(*args, **kwargs)\n File "/tmp/venvur4zvddz/script.py", line 19, in make_sftp\n sftp_client = ssh_hook.get_conn().open_sftp()\n File "/usr/local/lib/airflow/airflow/contrib/hooks/ssh_hook.py", line 194, in get_conn\n client.connect(**connect_kwargs)\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/client.py", line 412, in connect\n server_key = t.get_remote_server_key()\n File "/opt/python3.6/lib/python3.6/site-packages/paramiko/transport.py", line 834, in get_remote_server_key\n raise SSHException("No existing session")\nparamiko.ssh_exception.SSHException: No existing session\n'
do I have to set other options? Thank you!
To SSH into the host as a user with username “user_a”, an SSH key pair should be generated for that user and the public key should be added to the host machine. The following are the steps that would create an SSH connection to the “jupyter” user which has the write permissions.
ssh-keygen -t rsa -f ~/.ssh/sftp-ssh-key -C user_a
“sftp-ssh-key” → Name of the pair of public and private keys (Public key: sftp-ssh-key.pub, Private key: sftp-ssh-key)
“user_a” → User in the VM that we are trying to connect to
chmod 400 ~/.ssh/sftp-ssh-key
Now, copy the contents of the public key sftp-ssh-key.pub
into ~/.ssh/authorized_keys
of your host system. Check for necessary permissions for authorized_keys
and grant them accordingly using chmod
.
I tested the setup with a Compute Engine VM . In the Compute Engine console, edit the VM settings to add the contents of the generated SSH public key into the instance metadata. Detailed instructions can be found here. If you are connecting to a Compute Engine VM, make sure that the instance has the appropriate firewall rule to allow the SSH connection.
Upload the private key to the client machine. In this scenario, the client is the Airflow DAG so the key file should be accessible from the Composer/Airflow environment. To make the key file accessible, it has to be uploaded to the GCS bucket associated with the Composer environment. For example, if the private key is uploaded to the data
folder in the bucket, the key file path would be /home/airflow/gcs/data/sftp-ssh-key
.
If password authentication is not configured on the host machine, follow the below steps to enable password authentication.
sudo passwd user_a
sshd_config
file./etc/ssh/sshd_config
PasswordAuthentication no
to PasswordAuthentication yes
. After making that change, restart the SSH service by running the following command as root.sudo service ssh restart
Password authentication has been configured now.
Create a connection in Airflow with the below configuration or use the existing connection.
The Extra JSON dictionary would look like this. Here, we have uploaded the private key file to the data
folder in the Composer environment's GCS bucket.
{
"key_file": "/home/airflow/gcs/data/sftp-ssh-key",
"conn_timeout": "30",
"look_for_keys": "false"
}
If the host machine is configured to allow password authentication, these are the changes to be made in the Airflow connection.
The Extra
parameter can be empty.
The Password
parameter is the user_a's user password on the host machine.
The task logs show that the password authentication was successful.
INFO - Authentication (password) successful!
paramiko=2.9.2
library. I tried downgrading paramiko
but the older versions do not seem to support OPENSSH keys. Found an alternative paramiko-ng
in which the validation issue has been fixed. Changed the Python dependency from paramiko
to paramiko-ng
in the PythonVirtualenvOperator
.from airflow import DAG
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
def make_sftp():
import paramiko
from airflow.contrib.hooks.ssh_hook import SSHHook
ssh_hook = SSHHook(ssh_conn_id="sftp_connection")
sftp_client = ssh_hook.get_conn().open_sftp()
print("=================SFTP Connection Successful=================")
remote_host = "/home/sftp-folder/sample_sftp_file" # file path in the host system
local_host = "/home/airflow/gcs/data/sample_sftp_file" # file path in the client system
sftp_client.get(remote_host,local_host) # GET operation to copy file from host to client
sftp_client.close()
return 0
etl_dag = DAG("sftp_dag",
start_date=datetime.now(),
schedule_interval=None,
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 5,
"retry_delay": timedelta(minutes=5)})
sftp = PythonVirtualenvOperator(task_id="sftp",
python_callable=make_sftp,
requirements=["sshtunnel", "paramiko-ng", "pysftp"],
dag=etl_dag)
start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
start_pipeline >> sftp
The sample_sftp_file
has been copied from the host system to the specified Composer bucket.