Search code examples
airflowhashicorp-vault

Airflow deployment stuck after using config from Vault backend


I have an Airflow application running on Kubernetes that is using Vault as a secret backend. Recently I manage to move my config value sql_alchemy_conn to the Vault as it contains password for the user. I can see that it is fetching the value from secret backend and able to connect to database and run migration job.

But since then I cannot deploy the rest of the application because all other resources are stuck in the init container wait-for-airflow-migrations. I am using the official helm chart to deploy the application and this is the python code that it uses to check if the migrations are run

import airflow
import logging
import os
import time

from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory

from airflow import settings

package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)

timeout=60

with settings.engine.connect() as connection:
    context = MigrationContext.configure(connection)
    ticker = 0
    while True:
        source_heads = set(script_.get_heads())

        db_heads = set(context.get_current_heads())
        if source_heads == db_heads:
            break

        if ticker >= timeout:
            raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
        ticker += 1
        time.sleep(1)
        logging.info('Waiting for migrations... %s second(s)', ticker)

Here's the logs of the wait-for-migration container

[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
  File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.

I did some debugging and found that it is using the default sql_alchemy_conn value from config file which is basically with sqlite engine.

Shall I update this script and fetch value manually from the Vault backend or am I missing some configuration?


Solution

  • I made it work (for now) by manually fetching the config value from Vault backend and creating the engine, instead of using settings.engine.connect(). Here is my wait-for-migration-command template

    {{ define "wait-for-migrations-command" }}
      {{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
      - python
      - -c
      - |
            import airflow
            import logging
            import os
            import time
    
            from airflow.providers.hashicorp.secrets.vault import VaultBackend
            from alembic.config import Config
            from alembic.runtime.migration import MigrationContext
            from alembic.script import ScriptDirectory
            from sqlalchemy.engine import create_engine
    
            try:
                sql_alchemy_conn = VaultBackend(
                    config_path="applications/secrets/airflow/config/",
                    auth_type="kubernetes",
                    kubernetes_role="kubernetes_role",
                ).get_config(key="sql_alchemy_conn")
            except Exception:
                raise
    
            package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
            directory = os.path.join(package_dir, "migrations")
            config = Config(os.path.join(package_dir, "alembic.ini"))
            config.set_main_option("script_location", directory)
            config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
            script_ = ScriptDirectory.from_config(config)
    
            engine = create_engine(sql_alchemy_conn)
    
            timeout = 60
            with engine.connect() as connection:
                context = MigrationContext.configure(connection)
                ticker = 0
                while True:
                    source_heads = set(script_.get_heads())
    
                    db_heads = set(context.get_current_heads())
                    if source_heads == db_heads:
                        break
    
                    if ticker >= timeout:
                        raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
                    ticker += 1
                    time.sleep(1)
                    logging.info("Waiting for migrations... %s second(s)", ticker)
    {{- end }}
    

    I am sure there must be some other way to fix this. Will keep looking.