I have a MySQL database table with a date-time column and a associated data column. I need maximum values of the data column for multiple pre-defined date-time ranges. Let's say I have a row for every 5 minutes and I need the maximum value of the data column for every consecutive 12-hour ranges. I need to do it in one query and I need it done with SQLAlchemy's ORM functions.
I've found this answer which does what I need. However, I struggled to implement it using SQLAlchemy's ORM approach. Specifically, the parentheses with the subquery using union_all is where my understanding of SQLAlchemy is lacking.
I think maybe you need to apply subquery()
but maybe include the code you have tried. Here is a version that seems to work but I don't know a lot about datetime handling in mysql
and datetime handling can get really complicated so better research that separately. This is using "core"
and the "ORM"
layers together in SQLAlchemy 2+ by utilizing select()
.
I try to rollback the transaction in this example so I can run the example over and over again.
from datetime import datetime
from sqlalchemy import (
Integer,
String,
DateTime,
ForeignKey,
UniqueConstraint,
)
from sqlalchemy.schema import (
Column,
)
from sqlalchemy.orm import backref, relationship, declarative_base, Session
from sqlalchemy import create_engine, MetaData, Column, ForeignKey, Integer, String
from sqlalchemy.sql import select, union_all, literal_column, func, and_, bindparam
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_IP}:{DB_PORT}', echo=True)
Base = declarative_base(metadata=MetaData(schema="testdb"))
from sqlalchemy import Column, String, SMALLINT, column
from sqlalchemy.dialects.mysql import SMALLINT, TINYINT, BIGINT, CHAR
class Sample(Base):
__tablename__ = "samples"
id = Column(BIGINT(20), primary_key=True, autoincrement=True, nullable=False)
recorded_on = Column(DateTime, nullable=False)
recorded_value = Column(BIGINT(20), nullable=False)
Base.metadata.create_all(engine)
# Ranges are from the bottom of the hour ie. minute=0, second=0 to the bottom
# of the next hour so we can do >= range1 and < range2.
# We have to do the final hour with range2 as the next day.
RECORDED_ON_RANGES = [
(datetime(year=2023, month=10, day=11, hour=hour, minute=0, second=0), datetime(year=2023, month=10, day=11, hour=hour+1, minute=0, second=0)) for hour in range(23)
] + [
(datetime(year=2023, month=10, day=11, hour=23, minute=0, second=0), datetime(year=2023, month=10, day=12, hour=0, minute=0, second=0))
]
def get_test_recorded_value(hour, minute):
# Try to include a few high and low in specific hours and then a fallback.
if hour == 12 and minute % 2 == 0:
return 100
elif hour in (6, 12):
return 3
elif hour == 13 and minute % 3 == 0:
return 4
else:
return 1
def get_test_samples():
# Make samples every minute for hours 0 to 19 leave hours 20 to 23 empty to test LEFT OUTER JOIN.
return (Sample(
recorded_on=datetime(year=2023, month=10, day=11, hour=hour, minute=minute, second=0),
recorded_value=get_test_recorded_value(hour, minute)) for minute in range(60) for hour in range(20))
with Session(engine) as session, session.begin():
session.add_all(get_test_samples())
session.flush()
# Apply subquery() to allow subsequent reference to columns in union.
# Only label the first select columns like the example.
range_union = select(
bindparam(None, value=RECORDED_ON_RANGES[0][0], type_=DateTime).label("range1"),
bindparam(None, value=RECORDED_ON_RANGES[0][1], type_=DateTime).label("range2")
).union_all(*[
select(
bindparam(None, value=range1, type_=DateTime),
bindparam(None, value=range2, type_=DateTime)
) for range1, range2 in RECORDED_ON_RANGES[1:]
]).subquery()
q = select(
range_union.c.range1,
range_union.c.range2,
func.max(Sample.recorded_value),
).outerjoin(
Sample,
and_(Sample.recorded_on >= range_union.c.range1, Sample.recorded_on < range_union.c.range2)
).group_by(range_union.c.range1, range_union.c.range2)
for (range1, range2, max_value) in session.execute(q):
print (range1, range2, max_value)
session.rollback()