Search code examples
sftpgoogle-cloud-composer

SFTP with Google Cloud Composer


I need to upload a file via SFTP into an external server through Cloud Composer. The code for the task is as follows:


    from airflow import DAG
    from airflow.operators.python_operator import PythonVirtualenvOperator
    from airflow.operators.dummy_operator import DummyOperator
    from datetime import datetime, timedelta
    
    
    def make_sftp():
        import paramiko
        import pysftp
        import os
        from airflow.contrib.hooks.ssh_hook import SSHHook
        import subprocess
    
    
        ssh_hook = SSHHook(ssh_conn_id="conn_id")
        sftp_client = ssh_hook.get_conn().open_sftp()
    
        return 0
    
    
    etl_dag = DAG("dag_test",
                  start_date=datetime.now(tz=local_tz),
                  schedule_interval=None,
                  default_args={
                      "owner": "airflow",
                      "depends_on_past": False,
                      "email_on_failure": False,
                      "email_on_retry": False,
                      "retries": 5,
                      "retry_delay": timedelta(minutes=5)})
    
    sftp = PythonVirtualenvOperator(task_id="sftp",
                                    python_callable=make_sftp,
                                    requirements=["sshtunnel", "paramiko"],
    
                                    dag=etl_dag)
    
    start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
    
    start_pipeline >> sftp

In "conn_id" I have used the following options: {"no_host_key_check": "true"}, the DAG runs for a couple of seconds and the fail with the following message:

WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks\n[2022-02-10 10:01:59,358] {ssh_hook.py:171} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks\nTraceback (most recent call last):\n  File "/tmp/venvur4zvddz/script.py", line 23, in <module>\n    res = make_sftp(*args, **kwargs)\n  File "/tmp/venvur4zvddz/script.py", line 19, in make_sftp\n    sftp_client = ssh_hook.get_conn().open_sftp()\n  File "/usr/local/lib/airflow/airflow/contrib/hooks/ssh_hook.py", line 194, in get_conn\n    client.connect(**connect_kwargs)\n  File "/opt/python3.6/lib/python3.6/site-packages/paramiko/client.py", line 412, in connect\n    server_key = t.get_remote_server_key()\n  File "/opt/python3.6/lib/python3.6/site-packages/paramiko/transport.py", line 834, in get_remote_server_key\n    raise SSHException("No existing session")\nparamiko.ssh_exception.SSHException: No existing session\n'

do I have to set other options? Thank you!


Solution

  • Configuring the SSH connection with key pair authentication

    To SSH into the host as a user with username “user_a”, an SSH key pair should be generated for that user and the public key should be added to the host machine. The following are the steps that would create an SSH connection to the “jupyter” user which has the write permissions.

    1. Run the following commands on the local machine to generate the required SSH key:
    ssh-keygen -t rsa -f ~/.ssh/sftp-ssh-key -C user_a
    

    “sftp-ssh-key” → Name of the pair of public and private keys (Public key: sftp-ssh-key.pub, Private key: sftp-ssh-key)

    “user_a” → User in the VM that we are trying to connect to

    chmod 400 ~/.ssh/sftp-ssh-key
    
    1. Now, copy the contents of the public key sftp-ssh-key.pub into ~/.ssh/authorized_keys of your host system. Check for necessary permissions for authorized_keys and grant them accordingly using chmod.

      I tested the setup with a Compute Engine VM . In the Compute Engine console, edit the VM settings to add the contents of the generated SSH public key into the instance metadata. Detailed instructions can be found here. If you are connecting to a Compute Engine VM, make sure that the instance has the appropriate firewall rule to allow the SSH connection.

    2. Upload the private key to the client machine. In this scenario, the client is the Airflow DAG so the key file should be accessible from the Composer/Airflow environment. To make the key file accessible, it has to be uploaded to the GCS bucket associated with the Composer environment. For example, if the private key is uploaded to the data folder in the bucket, the key file path would be /home/airflow/gcs/data/sftp-ssh-key.


    Configuring the SSH connection with password authentication

    If password authentication is not configured on the host machine, follow the below steps to enable password authentication.

    1. Set the user password using the below command and enter the new password twice.
    sudo passwd user_a
    
    1. To enable SSH password authentication, you must SSH into the host machine as root to edit the sshd_config file.
    /etc/ssh/sshd_config
    
    1. Then, change the line PasswordAuthentication no to PasswordAuthentication yes. After making that change, restart the SSH service by running the following command as root.
    sudo service ssh restart
    

    Password authentication has been configured now.


    Creating connections and uploading the DAG

    1.1 Airflow connection with key authentication

    Create a connection in Airflow with the below configuration or use the existing connection.

    enter image description here

    Extra field

    The Extra JSON dictionary would look like this. Here, we have uploaded the private key file to the data folder in the Composer environment's GCS bucket.

    {
       "key_file": "/home/airflow/gcs/data/sftp-ssh-key",
       "conn_timeout": "30",
       "look_for_keys": "false"
    }
    
    

    1.2 Airflow connection with password authentication

    If the host machine is configured to allow password authentication, these are the changes to be made in the Airflow connection. The Extra parameter can be empty.

    enter image description here

    The Password parameter is the user_a's user password on the host machine.

    The task logs show that the password authentication was successful.

    INFO - Authentication (password) successful!
    
    1. Upload the DAG to the Composer environment and trigger the DAG. I was facing key validation issue with the latest version of the paramiko=2.9.2 library. I tried downgrading paramiko but the older versions do not seem to support OPENSSH keys. Found an alternative paramiko-ng in which the validation issue has been fixed. Changed the Python dependency from paramiko to paramiko-ng in the PythonVirtualenvOperator.
    from airflow import DAG
    from airflow.operators.python_operator import PythonVirtualenvOperator
    from airflow.operators.dummy_operator import DummyOperator
    from datetime import datetime, timedelta
    
    
    def make_sftp():
        import paramiko
        from airflow.contrib.hooks.ssh_hook import SSHHook
    
        ssh_hook = SSHHook(ssh_conn_id="sftp_connection")
        sftp_client = ssh_hook.get_conn().open_sftp()
        
        print("=================SFTP Connection Successful=================")
        
        remote_host = "/home/sftp-folder/sample_sftp_file" # file path in the host system
        local_host = "/home/airflow/gcs/data/sample_sftp_file" # file path in the client system
        
        sftp_client.get(remote_host,local_host) # GET operation to copy file from host to client
        
        sftp_client.close()
        return 0
    
    
    etl_dag = DAG("sftp_dag",
                    start_date=datetime.now(),
                    schedule_interval=None,
                    default_args={
                        "owner": "airflow",
                        "depends_on_past": False,
                        "email_on_failure": False,
                        "email_on_retry": False,
                        "retries": 5,
                        "retry_delay": timedelta(minutes=5)})
    
    sftp = PythonVirtualenvOperator(task_id="sftp",
                                    python_callable=make_sftp,
                                    requirements=["sshtunnel", "paramiko-ng", "pysftp"],
                                    dag=etl_dag)
    
    start_pipeline = DummyOperator(task_id="start_pipeline", dag=etl_dag)
    
    start_pipeline >> sftp
    

    Results

    The sample_sftp_file has been copied from the host system to the specified Composer bucket.

    enter image description here