I have an Airflow application running on Kubernetes that is using Vault as a secret backend. Recently I manage to move my config value sql_alchemy_conn
to the Vault as it contains password for the user.
I can see that it is fetching the value from secret backend and able to connect to database and run migration job.
But since then I cannot deploy the rest of the application because all other resources are stuck in the init container wait-for-airflow-migrations
.
I am using the official helm chart to deploy the application and this is the python code that it uses to check if the migrations are run
import airflow
import logging
import os
import time
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from airflow import settings
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)
timeout=60
with settings.engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info('Waiting for migrations... %s second(s)', ticker)
Here's the logs of the wait-for-migration
container
[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.
I did some debugging and found that it is using the default sql_alchemy_conn
value from config file which is basically with sqlite engine.
Shall I update this script and fetch value manually from the Vault backend or am I missing some configuration?
I made it work (for now) by manually fetching the config value from Vault backend and creating the engine, instead of using settings.engine.connect()
. Here is my wait-for-migration-command
template
{{ define "wait-for-migrations-command" }}
{{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
- python
- -c
- |
import airflow
import logging
import os
import time
from airflow.providers.hashicorp.secrets.vault import VaultBackend
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy.engine import create_engine
try:
sql_alchemy_conn = VaultBackend(
config_path="applications/secrets/airflow/config/",
auth_type="kubernetes",
kubernetes_role="kubernetes_role",
).get_config(key="sql_alchemy_conn")
except Exception:
raise
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, "migrations")
config = Config(os.path.join(package_dir, "alembic.ini"))
config.set_main_option("script_location", directory)
config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
script_ = ScriptDirectory.from_config(config)
engine = create_engine(sql_alchemy_conn)
timeout = 60
with engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info("Waiting for migrations... %s second(s)", ticker)
{{- end }}
I am sure there must be some other way to fix this. Will keep looking.