I am having trouble understanding how to extract the output of SSHOperator
in Airflow.
Here's my complete workflow:
import base64
import pendulum
from airflow.decorators import dag, task
from airflow.providers.ssh.operators.ssh import SSHOperator
@dag(
start_date=pendulum.datetime(2024, 3, 1, tz="UTC"),
tags=["test"],
)
def run_ssh():
@task
def ssh(remote_host):
op = SSHOperator(task_id="ssh", remote_host=remote_host, command="ls -l")
return op.output
@task
def read(value):
# value = base64.b64decode(value).decode("utf-8")
print("*********", value)
read(ssh("myhost"))
run_ssh()
What gets printed in the task logs is:
********* {{ task_instance.xcom_pull(task_ids='ssh', dag_id='adhoc_***', key='return_value') }}
The SSHOperator
code seems to return the aggregated stdout
(base64 encoded). Even when I turn on the base64 decode, it still seems to print the same templated value. What am I misunderstanding in this pipeline?
Operators inside of operators would not executed, because nor Airflow Scheduler nor Airflow worker doesn't know about it.
So when you call op.output
it is just return Jinja template representation about task which never executed into the XCom.
import base64
import pendulum
from airflow.decorators import dag, task
from airflow.providers.ssh.operators.ssh import SSHOperator
@dag(
start_date=pendulum.datetime(2024, 3, 1, tz="UTC"),
tags=["test"],
)
def run_ssh():
op = SSHOperator(task_id="ssh", remote_host="myhost", command="ls -l")
@task
def read(value):
# value = base64.b64decode(value).decode("utf-8")
print("*********", value)
read(op.output)
run_ssh()