Search code examples
pythonairflowazure-sql-database

Not able to connect to Azure SQL database from Airflow DAG


I'm a newbie to Airflow. I have installed Airflow on an Ubuntu VM and configured the connection properties to connect to my Azure SQL database. I have the following code in my DAG :

from airflow import DAG
from datetime import datetime
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator

with DAG ('user_processing',start_date=datetime(2023,3,10), schedule_interval='@daily',catchup=False) as dag:
    create_table = MsSqlOperator(
        task_id = "create_table",
        mssql_conn_id="mssql",
        sql = """
        CREATE TABLE user_schema.user_det (
        id int,
        first_name VARCHAR(200),
        last_name VARCHAR(200)
        )
        """
    )

And when I try to run the task using the command airflow tasks test user_processing create_table 2023-10-02 , I'm getting the error below.

[2023-10-03T16:54:08.684+0000] {dagbag.py:539} INFO - Filling up the DagBag from /home/azureuser/airflow/dags
[2023-10-03T16:54:08.744+0000] {example_python_operator.py:89} WARNING - The virtalenv_python example task requires virtualenv, please install it.
[2023-10-03T16:54:08.754+0000] {tutorial_taskflow_api_virtualenv.py:29} WARNING - The tutorial_taskflow_api_virtualenv example DAG requires virtualenv, please install it.
[2023-10-03T16:54:08.876+0000] {example_local_kubernetes_executor.py:39} WARNING - Could not import DAGs in example_local_kubernetes_executor.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/example_dags/example_local_kubernetes_executor.py", line 37, in <module>
    from kubernetes.client import models as k8s
ModuleNotFoundError: No module named 'kubernetes'
[2023-10-03T16:54:08.877+0000] {example_local_kubernetes_executor.py:40} WARNING - Install Kubernetes dependencies with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:08.950+0000] {example_kubernetes_executor.py:38} WARNING - The example_kubernetes_executor example DAG requires the kubernetes provider. Please install it with: pip install apache-airflow[cncf.kubernetes]
[2023-10-03T16:54:09.115+0000] {workday.py:36} WARNING - Could not import pandas. Holidays will not be considered.
[2023-10-03T16:54:09.146+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: user_processing.create_table __airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__ [None]>
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1359} INFO - Starting attempt 1 of 1
[2023-10-03T16:54:09.151+0000] {taskinstance.py:1428} WARNING - cannot record queued_duration for task create_table because previous state change time has not been saved
[2023-10-03T16:54:09.152+0000] {taskinstance.py:1380} INFO - Executing <Task(MsSqlOperator): create_table> on 2023-10-02 00:00:00+00:00
[2023-10-03T16:54:09.172+0000] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='user_processing' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2023-10-02T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2023-10-03T14:54:14.207818+00:00__'
[2023-10-03T16:54:09.174+0000] {sql.py:274} INFO - Executing:
        CREATE TABLE user_schema.user_det (
        id int,
        first_name VARCHAR(200),
        last_name VARCHAR(200)
        )

[2023-10-03T16:54:09.177+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:09.323+0000] {base.py:73} INFO - Using connection ID 'mssql' for task execution.
[2023-10-03T16:54:10.704+0000] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
  File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
  File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
  File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
  File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
    output = hook.run(
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
    with closing(self.get_conn()) as conn:
  File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
    conn = pymssql.connect(
  File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")
[2023-10-03T16:54:10.716+0000] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=user_processing, task_id=create_table, execution_date=20231002T000000, start_date=, end_date=20231003T165410
Traceback (most recent call last):
  File "src/pymssql/_pymssql.pyx", line 647, in pymssql._pymssql.connect
  File "src/pymssql/_mssql.pyx", line 2109, in pymssql._mssql.connect
  File "src/pymssql/_mssql.pyx", line 701, in pymssql._mssql.MSSQLConnection.__init__
  File "src/pymssql/_mssql.pyx", line 1818, in pymssql._mssql.maybe_raise_MSSQLDatabaseException
  File "src/pymssql/_mssql.pyx", line 1835, in pymssql._mssql.raise_MSSQLDatabaseException
pymssql._mssql.MSSQLDatabaseException: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 59, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 620, in task_test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 77, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1832, in run
    self._run_raw_task(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1516, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode, session=session)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1679, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1742, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/operators/sql.py", line 280, in execute
    output = hook.run(
  File "/usr/local/lib/python3.8/dist-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
    with closing(self.get_conn()) as conn:
  File "/home/azureuser/.local/lib/python3.8/site-packages/airflow/providers/microsoft/mssql/hooks/mssql.py", line 98, in get_conn
    conn = pymssql.connect(
  File "src/pymssql/_pymssql.pyx", line 653, in pymssql._pymssql.connect
pymssql._pymssql.OperationalError: (18456, b"Login failed for user 'rohan1997'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n").

I'm pretty sure I configured the connection correctly in airflow connections. This is the connection object I'm configuring in Airflow - Airflow Connection Object. This is my database connection string: Driver={ODBC Driver 18 for SQL Server};Server=tcp:mydbserver.database.windows.net,1433;Database=mydatabase;Uid=loginname;Pwd={your_password_here};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;

The below code works just fine (ran without airflow):

import pymssql


try:
    # Establish a database connection
    connection = pymssql.connect(server='server.database.windows.net',port='1433', user='username', password='pwd', database='database')

    # Create a cursor object for executing SQL commands
    cursor = connection.cursor()

    # Example: Execute a SQL query
    cursor.execute("SELECT * FROM user_schema.demo")

    # Fetch and print results
    rows = cursor.fetchall()
    for row in rows:
        print(row)

    # Close the cursor and connection
    cursor.close()
    connection.close()

except pymssql.Error as e:
    print(f"Error: {str(e)}")

I'm not able to figure out how to fix this. I've added the VM's IP address to the Azure SQL Server firewall.


Solution

  • Finally I was able to figure out why I was getting the exception :- b"Login failed for user 'username'.DB-Lib error message 20018, severity 14:\nGeneral SQL Server error: Check messages from the SQL Server\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (airflowtutserver.database.windows.net)\n" through trial and error method.

    Whenever we try to add a new connection through Airflow UI, we don't get an option to specify the database name as depicted in this image. So sql server doesn't know which database to use and that is why it rejects the connection.

    Now the solution :- Add the database name in the task as well as the query itself as depicted in this image. This tells the server the name of the database you want to use. Hence you'll not face this exception.