Search code examples
airflow

Unable to read SSHOperator output in Airflow


I am having trouble understanding how to extract the output of SSHOperator in Airflow.

Here's my complete workflow:

import base64
import pendulum
from airflow.decorators import dag, task
from airflow.providers.ssh.operators.ssh import SSHOperator


@dag(
    start_date=pendulum.datetime(2024, 3, 1, tz="UTC"),
    tags=["test"],
)
def run_ssh():
    @task
    def ssh(remote_host):
        op = SSHOperator(task_id="ssh", remote_host=remote_host, command="ls -l")
        return op.output

    @task
    def read(value):
        # value = base64.b64decode(value).decode("utf-8")
        print("*********", value)

    read(ssh("myhost"))


run_ssh()

What gets printed in the task logs is:

********* {{ task_instance.xcom_pull(task_ids='ssh', dag_id='adhoc_***', key='return_value') }}

The SSHOperator code seems to return the aggregated stdout (base64 encoded). Even when I turn on the base64 decode, it still seems to print the same templated value. What am I misunderstanding in this pipeline?


Solution

  • Operators inside of operators would not executed, because nor Airflow Scheduler nor Airflow worker doesn't know about it.

    So when you call op.output it is just return Jinja template representation about task which never executed into the XCom.

    import base64
    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.ssh.operators.ssh import SSHOperator
    
    
    @dag(
        start_date=pendulum.datetime(2024, 3, 1, tz="UTC"),
        tags=["test"],
    )
    def run_ssh():
        op = SSHOperator(task_id="ssh", remote_host="myhost", command="ls -l")
    
        @task
        def read(value):
            # value = base64.b64decode(value).decode("utf-8")
            print("*********", value)
    
        read(op.output)
    
    run_ssh()