Search code examples
databasetime-seriesquestdb

Scheduling two linked QuestDB queries with Dagster


I want to run two QuestDB queries: one for downsampling data older than three weeks and aggregating at 30 seconds intervals, and one to delete the partition with the raw data. At the moment I just execute with a cron using curl and the rest API

curl -G --data-urlencode "query=INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, max(price) as high, first(price) as open, last(price) as close FROM trades WHERE timestamp < dateadd('d', -21, now());"

curl -G --data-urlencode "ALTER TABLE trades DROP PARTITION WHERE timestamp < dateadd('d', -21, now());"

I would like to use Dagster instead, so I can do backfilling for past dates and have better error control. I checked Dagster and I cannot see any QuestDB operator. What would be the best way to do this?


Solution

  • Since Dagster supports writing operators using python, we can just define a resource based on psycopg, and then define @ops using that resource

    import psycopg
    
    from dagster import op, graph, resource, out
    
    # Define resources for PostgreSQL
    @resource(config_schema={"connection_string": str})
    def postgres_resource(context):
        conn = psycopg.connect(context.resource_config["connection_string"])
        try:
            yield conn
        finally:
            conn.close()
    
    # Define operations
    @op(required_resource_keys={'postgres'}, out=Out(str))
    def downsample_data(context, execution_date: str):
        sql_query = f"""
            INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, 
            max(price) as high, first(price) as open, last(price) as close 
            FROM trades WHERE timestamp = '{execution_date}';'
        """
        with context.resources.postgres.cursor() as cursor:
            cursor.execute(sql_query)    
        return execution_date
    
    # Define operations
    @op(required_resource_keys={'postgres'}, out=Out(str))
    def remove_partition(context, execution_date: str):
        sql_query = f"""
            ALTER TABLE trades DROP PARTITION 
            WHERE timestamp = '{execution_date}';'
        """
        with context.resources.postgres.cursor() as cursor:
            cursor.execute(sql_query)    
        return execution_date
    
    @op(out=Out(str))
    def get_execution_date():
        #TODO
    
    @graph
    def questdb_downsampler():
        execution_date = get_execution_date()
        remove_partition(downsample_data(execution_date))
    

    Note in this case I am using a connection_string that should be provided during the launch, and a dummy get_execution_date, which should probably be calculated using partitions, so we can backfill properly.

    I am making both @ops to return the execution date, so I can pass the output of the first as a parameter to the second and declare a dependency. If the first function returns an error, the second will not proceed.