I want to run two QuestDB queries: one for downsampling data older than three weeks and aggregating at 30 seconds intervals, and one to delete the partition with the raw data. At the moment I just execute with a cron using curl and the rest API
curl -G --data-urlencode "query=INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, max(price) as high, first(price) as open, last(price) as close FROM trades WHERE timestamp < dateadd('d', -21, now());"
curl -G --data-urlencode "ALTER TABLE trades DROP PARTITION WHERE timestamp < dateadd('d', -21, now());"
I would like to use Dagster instead, so I can do backfilling for past dates and have better error control. I checked Dagster and I cannot see any QuestDB operator. What would be the best way to do this?
Since Dagster supports writing operators using python, we can just define a resource based on psycopg, and then define @ops using that resource
import psycopg
from dagster import op, graph, resource, out
# Define resources for PostgreSQL
@resource(config_schema={"connection_string": str})
def postgres_resource(context):
conn = psycopg.connect(context.resource_config["connection_string"])
try:
yield conn
finally:
conn.close()
# Define operations
@op(required_resource_keys={'postgres'}, out=Out(str))
def downsample_data(context, execution_date: str):
sql_query = f"""
INSERT INTO downsampled_tb SELECT timestamp, min(price) as low,
max(price) as high, first(price) as open, last(price) as close
FROM trades WHERE timestamp = '{execution_date}';'
"""
with context.resources.postgres.cursor() as cursor:
cursor.execute(sql_query)
return execution_date
# Define operations
@op(required_resource_keys={'postgres'}, out=Out(str))
def remove_partition(context, execution_date: str):
sql_query = f"""
ALTER TABLE trades DROP PARTITION
WHERE timestamp = '{execution_date}';'
"""
with context.resources.postgres.cursor() as cursor:
cursor.execute(sql_query)
return execution_date
@op(out=Out(str))
def get_execution_date():
#TODO
@graph
def questdb_downsampler():
execution_date = get_execution_date()
remove_partition(downsample_data(execution_date))
Note in this case I am using a connection_string
that should be provided during the launch, and a dummy get_execution_date
, which should probably be calculated using partitions, so we can backfill properly.
I am making both @ops to return the execution date, so I can pass the output of the first as a parameter to the second and declare a dependency. If the first function returns an error, the second will not proceed.