Search code examples
pythonsql-serverairflowairflow-xcom

Airflow custom plugin: Value is not returned in XCOM even though I am able to view the value in log


I am trying to push the results of SQL commands in airflow SQLHook. Even though I am able to view the command results in log but it is not pushed into xcom.

class SqlExecuteOperator(BaseOperator):
    template_fields = ('sql',)
    template_ext = ('.hql', '.sql',)
    ui_color = '#fff7e6'

    @apply_defaults
    def __init__(
            self, sql,
            conn_id=None,
            database=None, 
            *args, **kwargs):
        super(SqlExecuteOperator, self).__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.sql = sql
        self.database = database

    def execute(self, **kwargs):
        self.log.info('Executing SQL statement: ' + self.sql)
        records = self.get_db_hook().get_first(self.sql)
        self.log.info("Record: " + str(records))
        return int(records[0])

    def get_db_hook(self):
        conn = BaseHook.get_connection(conn_id=self.conn_id)
        
        hook = BaseHook.get_hook(conn_id=self.conn_id)
        hook.connection = conn
        
        if self.database:
            hook.schema = self.database
        
        return hook

operator that I am using is as follows:

    @task
    def get_results_from_sql(**kwargs):
        sql_task_op = SqlExecuteOperator(
            task_id="sql_task",
            conn_id=SQL_CONNECTION,
            sql="SELECT 1",
            database='TEST',
            do_xcom_push=True,
        )
        sql_task_op.execute(context=dict())

Please find the log details

95648bf8cf8b
*** Found local files:
***   * /opt/airflow/logs/dag_id=SQL_TEST/run_id=manual__2023-12-22T03:53:23.673472+00:00/task_id=get_results_from_sql/attempt=1.log
[2023-12-21, 22:53:24 EST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [queued]>
[2023-12-21, 22:53:24 EST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [queued]>
[2023-12-21, 22:53:24 EST] {taskinstance.py:1361} INFO - Starting attempt 1 of 2
[2023-12-21, 22:53:24 EST] {taskinstance.py:1382} INFO - Executing <Task(_PythonDecoratedOperator): get_results_from_sql> on 2023-12-22 03:53:23.673472+00:00
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:57} INFO - Started process 605102 to run task
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'SQL_TEST', 'get_results_from_sql', 'manual__2023-12-22T03:53:23.673472+00:00', '--job-id', '266440', '--raw', '--subdir', 'DAGS_FOLDER/sql_test.py', '--cfg-path', '/tmp/tmp0b98o6o4']
[2023-12-21, 22:53:24 EST] {standard_task_runner.py:85} INFO - Job 266440: Subtask get_results_from_sql
[2023-12-21, 22:53:24 EST] {task_command.py:416} INFO - Running <TaskInstance: SQL_TEST.get_results_from_sql manual__2023-12-22T03:53:23.673472+00:00 [running]> on host 95648bf8cf8b
[2023-12-21, 22:53:25 EST] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SQL_TEST' AIRFLOW_CTX_TASK_ID='get_results_from_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-12-22T03:53:23.673472+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-22T03:53:23.673472+00:00'
[2023-12-21, 22:53:25 EST] {sql_test.py:30} INFO - Executing SQL statement: SELECT 1
[2023-12-21, 22:53:25 EST] {base.py:73} INFO - Using connection ID 'MS_TES' for task execution.
[2023-12-21, 22:53:25 EST] {sql.py:418} INFO - Running statement: SELECT 1, parameters: None
[2023-12-21, 22:53:25 EST] {sql_test.py:32} INFO - Record: (1,)
[2023-12-21, 22:53:25 EST] {python.py:194} INFO - Done. Returned value was: None
[2023-12-21, 22:53:25 EST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=SQL_TEST, task_id=get_results_from_sql, execution_date=20231222T035323, start_date=20231222T035324, end_date=20231222T035325
[2023-12-21, 22:53:25 EST] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-12-21, 22:53:25 EST] {taskinstance.py:2778} INFO - 1 downstream tasks scheduled from follow-on schedule check

I am not sure what I am missing. Please provide me a solution if anyone faced similar issue.


Solution

  • As has been mentioned already you should not be calling execute methods directly. Airflow calls these and uses their results to create the XCom messages.

    MsSqlOperator demo DAG

    The simplest usage is to return the raw result sets from MsSqlOperator such as in this dags/mssql_operator_xcom_dag.py file:

    from airflow.decorators import dag, task
    from airflow.operators.mssql_operator import MsSqlOperator
    from datetime import datetime
    
    @dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
    def mssql_operator_xcom_dag():
    
        @task
        def display_results(ti):
            xcom_results = ti.xcom_pull(key="return_value", task_ids="query_results")[0][0]
            return f'query_results said, "{xcom_results}"'
    
        query = MsSqlOperator(
            task_id="query_results",
            mssql_conn_id="app_env_instanceid_mssql",
            database="msdb",
            sql="SELECT 'Hello, MsSqlOperator!';"
        )
    
        display = display_results()
    
        query >> display
    
    mssql_operator_xcom_dag()
    

    After executing this DAG you'll see the following XCom messages output from the query_results and display_results tasks:

    enter image description here

    enter image description here

    MsSqlHook demo DAG

    The MsSqlOperator calls MsSqlHook to do most of the work so you can restructure things slightly to get exactly the same results using MsSqlHook such as in this dags/mssql_hook_xcom_dag.py file:

    from airflow.decorators import dag, task
    from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
    from datetime import datetime
    
    @dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
    def mssql_hook_xcom_dag():
    
        @task
        def query_results(conn_id, schema, sql):
            mssql = MsSqlHook(mssql_conn_id=conn_id, schema=schema)
            results = mssql.get_records(sql=sql)
            return results
    
        @task
        def display_results(ti):
            xcom_results = ti.xcom_pull(key="return_value", task_ids="query_results")[0][0]
            return f'query_results said, "{xcom_results}"'
    
        query = query_results(
            conn_id="app_env_instanceid_mssql",
            schema="msdb",
            sql="SELECT 'Hello, MsSqlHook!';"
        )
    
        display = display_results()
    
        query >> display
    
    mssql_hook_xcom_dag()
    

    After executing this DAG you'll see the following XCom messages output from the query_results and display_results tasks:

    enter image description here

    enter image description here

    Of the two approaches MsSqlHook can be more useful as it gives you the opportunity to return the results in different formats, such as returning a DataFrame by using mssql.get_pandas_df(sql=sql) instead.

    Orchestrating The Orchestrator

    The above DAGs can be demonstrated quickly in Docker containers, based on Apache's demo compose file from https://airflow.apache.org/docs/apache-airflow/2.8.0/docker-compose.yaml with the addition of an SQL Server 2019 container to test MsSqlOperator and MsSqlHook:

    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    #
    
    # Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
    #
    # WARNING: This configuration is for local development. Do not use it in a production deployment.
    #
    # This configuration supports basic configuration using environment variables or an .env file
    # The following variables are supported:
    #
    # AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
    #                                Default: apache/airflow:2.8.0
    # AIRFLOW_UID                  - User ID in Airflow containers
    #                                Default: 50000
    # AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
    #                                Default: .
    # Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
    #
    # _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
    #                                Default: airflow
    # _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
    #                                Default: airflow
    # _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
    #                                Use this option ONLY for quick checks. Installing requirements at container
    #                                startup is done EVERY TIME the service is started.
    #                                A better way is to build a custom image or extend the official image
    #                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
    #                                Default: ''
    #
    # Feel free to modify this file to suit your needs.
    ---
    x-airflow-common:
      &airflow-common
      # In order to add custom dependencies or upgrade provider packages you can use your extended image.
      # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
      # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
      image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
      # build: .
      environment:
        &airflow-common-env
        AIRFLOW__CORE__EXECUTOR: CeleryExecutor
        AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
        AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
        AIRFLOW__CORE__FERNET_KEY: ''
        AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
        AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
        AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
        # yamllint disable rule:line-length
        # Use simple http server on scheduler for health checks
        # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
        # yamllint enable rule:line-length
        AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
        # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
        # for other purpose (development, test and especially production usage) build/extend Airflow image.
        _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
      volumes:
        - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
        - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
        - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
        - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
      user: "${AIRFLOW_UID:-50000}:0"
      depends_on:
        &airflow-common-depends-on
        redis:
          condition: service_healthy
        postgres:
          condition: service_healthy
    
    services:
      sql2019:
        image: mcr.microsoft.com/mssql/server:2019-latest
        container_name: sql2019
        environment:
          - "ACCEPT_EULA=Y"
          - "MSSQL_SA_PASSWORD=StrongPassw0rd"
        restart: always
    
      postgres:
        image: postgres:13
        environment:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        volumes:
          - postgres-db-volume:/var/lib/postgresql/data
        healthcheck:
          test: ["CMD", "pg_isready", "-U", "airflow"]
          interval: 10s
          retries: 5
          start_period: 5s
        restart: always
    
      redis:
        image: redis:latest
        expose:
          - 6379
        healthcheck:
          test: ["CMD", "redis-cli", "ping"]
          interval: 10s
          timeout: 30s
          retries: 50
          start_period: 30s
        restart: always
    
      airflow-webserver:
        <<: *airflow-common
        command: webserver
        ports:
          - "8080:8080"
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-scheduler:
        <<: *airflow-common
        command: scheduler
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-worker:
        <<: *airflow-common
        command: celery worker
        healthcheck:
          # yamllint disable rule:line-length
          test:
            - "CMD-SHELL"
            - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        environment:
          <<: *airflow-common-env
          # Required to handle warm shutdown of the celery workers properly
          # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
          DUMB_INIT_SETSID: "0"
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-triggerer:
        <<: *airflow-common
        command: triggerer
        healthcheck:
          test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
      airflow-init:
        <<: *airflow-common
        entrypoint: /bin/bash
        # yamllint disable rule:line-length
        command:
          - -c
          - |
            if [[ -z "${AIRFLOW_UID}" ]]; then
              echo
              echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
              echo "If you are on Linux, you SHOULD follow the instructions below to set "
              echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
              echo "For other operating systems you can get rid of the warning with manually created .env file:"
              echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
              echo
            fi
            one_meg=1048576
            mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
            cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
            disk_available=$$(df / | tail -1 | awk '{print $$4}')
            warning_resources="false"
            if (( mem_available < 4000 )) ; then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
              echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
              echo
              warning_resources="true"
            fi
            if (( cpus_available < 2 )); then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
              echo "At least 2 CPUs recommended. You have $${cpus_available}"
              echo
              warning_resources="true"
            fi
            if (( disk_available < one_meg * 10 )); then
              echo
              echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
              echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
              echo
              warning_resources="true"
            fi
            if [[ $${warning_resources} == "true" ]]; then
              echo
              echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
              echo "Please follow the instructions to increase amount of resources available:"
              echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
              echo
            fi
            mkdir -p /sources/logs /sources/dags /sources/plugins
            chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
            exec /entrypoint airflow version
        # yamllint enable rule:line-length
        environment:
          <<: *airflow-common-env
          _AIRFLOW_DB_MIGRATE: 'true'
          _AIRFLOW_WWW_USER_CREATE: 'true'
          _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
          _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
          _PIP_ADDITIONAL_REQUIREMENTS: ''
        user: "0:0"
        volumes:
          - ${AIRFLOW_PROJ_DIR:-.}:/sources
    
      airflow-cli:
        <<: *airflow-common
        profiles:
          - debug
        environment:
          <<: *airflow-common-env
          CONNECTION_CHECK_MAX_COUNT: "0"
        # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
        command:
          - bash
          - -c
          - airflow
    
      # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
      # or by explicitly targeted on the command line e.g. docker-compose up flower.
      # See: https://docs.docker.com/compose/profiles/
      flower:
        <<: *airflow-common
        command: celery flower
        profiles:
          - flower
        ports:
          - "5555:5555"
        healthcheck:
          test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
          interval: 30s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: always
        depends_on:
          <<: *airflow-common-depends-on
          airflow-init:
            condition: service_completed_successfully
    
    volumes:
      postgres-db-volume:
    

    And a simple up.sh script to start the composition, install the required Airflow providers and create the Connection that's used in the above DAGs:

    #!/bin/bash
    
    function install_to_container {
        CONTAINER_ID=$1
        docker exec -it -u root $CONTAINER_ID bash -c '
            apt-get update
            apt-get install --no-install-recommends --yes build-essential manpages-dev unixodbc-dev
    '
        docker exec -it -u airflow $CONTAINER_ID bash -c '
            python -m pip install --upgrade pip
            pip install apache-airflow-providers-odbc
            pip install apache-airflow-providers-microsoft-mssql
    '
        docker container restart $CONTAINER_ID
    } 
    
    docker compose up airflow-init
    docker compose up -d
    
    install_to_container sql2019-apache-airflow-airflow-webserver-1
    install_to_container sql2019-apache-airflow-airflow-worker-1
    install_to_container sql2019-apache-airflow-airflow-scheduler-1
    
    curl -X POST 'http://localhost:8080/api/v1/connections' \
        --user 'airflow:airflow' \
        -H 'Content-Type: application/json' \
        -d '{
        "connection_id": "app_env_instanceid_mssql",
        "conn_type": "mssql",
        "host": "sql2019",
        "login": "sa",
        "schema": "msdb",
        "port": 1433,
        "password": "StrongPassw0rd"
    }'
    

    And, finally, a down.sh script to tear down the Docker containers and images:

    #!/bin/bash
    
    docker compose down --volumes --remove-orphans