I have a docker setup consisting of three containers. A) uvicorn/fastAPI app B) mongoDB C) Postgres DB
All three working. This is a problem with alembic, hosted in container A with the rest of my app, making changes to postgres in container C.
I have a DatabaseSession class that connects to the DB. I can connect just fine and run commands. It does seem to be responding. Here is the connect method within that class:
def _connect_db(self, db_schema_override: str = None):
schema = self.schema if not db_schema_override else db_schema_override
try:
engine = create_engine(
self.db_url, connect_args={"options": "-csearch_path={}".format(schema)}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
session = SessionLocal()
metadata = MetaData()
metadata.reflect(bind=engine)
metadata.create_all(engine)
return engine, session
except (AttributeError, ValueError):
raise
here is my alembic migrations file
def upgrade_db(db_schema: str, revision: str = "head") -> None:
db = DatabaseSession(db_schema)
db.db_data.maintenance_mode = True
db.db_data.save()
_config = config.Config("path/to/file/alembic.ini")
_config.set_main_option("sqlalchemy.url", db.db_url)
with db.engine.begin() as cnxn:
_config.attributes["connection"] = cnxn
_config.attributes["schema"] = db_schema.lower()
command.upgrade(_config, revision)
db.db_data.maintenance_mode = False
db.db_data.save()
here is my alembic env.py file run_migrations_online function:
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
db_schema = config.attributes.get("schema", "public")
with connectable.connect() as connection:
connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{db_schema}"'))
connection.execute(text(f"SET search_path TO '{db_schema}'"))
connection.dialect.default_schema_name = db_schema
context.configure(
connection=connection,
target_metadata=target_metadata,
include_schemas=True,
)
with context.begin_transaction():
context.run_migrations()
as you can see, Im utilizing schemas within each database to further seperate concerns. Im basically using mongodb to store DB records including database name and schema. Ive checked permissions on postgres, i can log into postgres and run commands as the same user and create tables.
I can access postgres from python using my DatabaseSession class and run commands as that same user. But I cannot seem to create anything.
When i run a 'create table' command using my DatabaseSession class from python interpreter, it registers the command was sent. But if i run \dt or anything in the psql CLI in another window, no changes show up. If i try to rerun the 'create table' command using the DatabaseSession class again, itll this time say this table already exists.
Its like its not committing the command. If i kill the session, or do a rollback, I can rerun the command just fine once. Am i supposed to be commiting this?
ive also tried just running alembic upgrade head. Same issue. It says it ran all the migrations just fine. no errors. but no changes persist in the DB.
additionally, Alembic does not appear to be creating the versions table where it houses the migration ID.
ive done this EXACT same setup in the past (earlier version of sqlalchemy). Im not exactly sure whats so different now that i cant seem to get this work.
any help is greatly appreciated. thank you
My 'event'
example below appears to be working but you're right. Your example just seems to rollback when you set echo=True
on the engine.
Your example doesn't seem to work in SqlAlchemy>=2
only in SqlAlchemy<2
.
After thinking about this it seems if you add
connection.commit()
after you set the dialect.default_schema_name = db_schema
then your code does seem to work in sqlalchemy 2.
I created a new alembic project, set the sqlalchemy url to a test postgresql db.
Then I:
alembic revision --autogenerate
alembic upgrade head
Your version seems to rollback.
I lifted the @event
from
setting-alternate-search-paths-on-connect.
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.sql import text
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.attributes['schema'] = 'myschema'
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from sqlalchemy import MetaData
from sqlalchemy import Table, Column, Integer
metadata = MetaData()
users_t = Table(
"users",
metadata,
Column("user_id", Integer, primary_key=True),
)
target_metadata = metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
echo=True,
)
db_schema = config.attributes.get("schema", "public")
# Toggle this between 'event' and 'inline'
schema_strategy = 'inline'
if schema_strategy == 'event':
from sqlalchemy import event
@event.listens_for(connectable, "connect", insert=True)
def set_search_path(dbapi_connection, connection_record):
existing_autocommit = dbapi_connection.autocommit
dbapi_connection.autocommit = True
cursor = dbapi_connection.cursor()
cursor.execute(f'CREATE SCHEMA IF NOT EXISTS {db_schema}')
cursor.execute(f"SET SESSION search_path='{db_schema}'")
cursor.close()
dbapi_connection.autocommit = existing_autocommit
with connectable.connect() as connection:
if schema_strategy == 'inline':
connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{db_schema}"'))
connection.execute(text(f"SET search_path TO '{db_schema}'"))
connection.dialect.default_schema_name = db_schema
context.configure(
connection=connection, target_metadata=target_metadata,
include_schemas=True,
)
with context.begin_transaction() as trans:
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()