I am using async postgres session and running into an issue with inserting and pulling data between 2 of my tables.
The customers table is a stand alone table that doesnt have a relationship with other tables. THis table gets updated independently by another insert.
For the quotes table, I need to be able to insert the quote with the corresponding customer ID (either the unique customer number or the id column in the customer table). When selecting data, I want to join to the customers table and pull the data back nested like the below.
class Customers(Base):
__tablename__ = "customers"
__table_args__ = (UniqueConstraint("unique_account_number"),)
id = mapped_column(Integer, primary_key=True)
customer_name: Mapped[str]
unique_account_number: Mapped[str]
updated_datetime = mapped_column(
DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
)
created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
class Quotes(Base):
__tablename__ = "quotes"
id = mapped_column(Integer, primary_key=True)
origin: Mapped[str]
destination: Mapped[str]
customer = relationship(Customers, foreign_key[Customers.unique_account_number])
updated_datetime = mapped_column(
DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
)
created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
Expected json insert into the quotes table
{
"origin": "Ney York City",
"destination": "Houston",
"unique_account_number": "A9457HDA"
}
expected json from a select statement on the quotes table.
{
"origin": "string",
"destination": "string",
"customer": {
"unique_account_number": "ABCD1234",
"customer_name": "Customer LLC"
}
How can I achieve this through using the ORM?
import sys
import asyncio
import datetime
import json
from sqlalchemy import MetaData, select, UniqueConstraint, Integer, DateTime
from sqlalchemy.sql import update, func
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship, joinedload
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine
class Base(AsyncAttrs, DeclarativeBase):
pass
class Customers(Base):
__tablename__ = "customers"
__table_args__ = (UniqueConstraint("unique_account_number"),)
id = mapped_column(Integer, primary_key=True)
customer_name: Mapped[str]
unique_account_number: Mapped[str]
updated_datetime = mapped_column(
DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
)
created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
class Quotes(Base):
__tablename__ = "quotes"
id = mapped_column(Integer, primary_key=True)
origin: Mapped[str]
destination: Mapped[str]
unique_account_number: Mapped[str]
customer = relationship(Customers,
primaryjoin="Quotes.unique_account_number==Customers.unique_account_number",
uselist=False,
foreign_keys=[Customers.unique_account_number])
updated_datetime = mapped_column(
DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
)
created_datetime = mapped_column(DateTime, default=datetime.datetime.now)
async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
async with async_session() as session:
async with session.begin():
session.add(Quotes(
origin="Ney York City",
destination="Houston",
unique_account_number="A9457HDA"))
session.add(Customers(
customer_name="Customer LLC",
unique_account_number="A9457HDA"))
async def select_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
async with async_session() as session:
async with session.begin():
stmt = select(Quotes).options(joinedload(Quotes.customer))
quotes = (await session.execute(stmt)).unique().scalars().all()
return quotes
def get_engine():
username, password, db = sys.argv[1:4]
return create_async_engine(
f"postgresql+asyncpg://{username}:{password}@/{db}",
echo=True,
)
def serialize_quote(quote):
customer_dict = {attr: getattr(quote.customer, attr) for attr in ["unique_account_number", "customer_name"]}
quote_dict = {attr: getattr(quote, attr) for attr in ["origin", "destination"]}
quote_dict["customer"] = customer_dict
return quote_dict
async def async_main() -> None:
engine = get_engine()
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await insert_objects(async_session)
quotes = (await select_objects(async_session))
print ([json.dumps(serialize_quote(quote)) for quote in quotes])
await engine.dispose()
asyncio.run(async_main())
``