Search code examples
google-bigquerydbt

Execute dynamic SQL within context of BigQuery SELECT


I have a pretty special use case - which may seem crazy - but I think it's a reasonable extreme edge case, and I think the answer is "it's just not possible in any way" but I'm asking all the same in case someone has an idea.

We have a huge amount of KPIs. I want to define them in a central dbt seed (relatively static table). Schema would be something like:

  • some
  • join
  • keys
  • kpi_name (e.g. "actions")
  • kpi_sql_expr (e.g. IF(`action_type` = "foo", `action_count`, 0))

I then want to join this from my BigQuery table (dbt model) based on the join keys and execute the kpi_sql_expr and store the value in a column called kpi_name (well, i'd probably store the information in a single JSON column called kpi because there could be more than 1 kpi_name for the row in question - but that's a specialisation of this use case and not core to the question). This would allow me to maintain the logic in one place and then use a generic pattern in the large number of models where I need to do this sort of thing without having to hand-curate KPIs in many, many places.

The problem I am running into as I've thought about how to solve this (and I've considered some ghastly implementations using all manner of sql/javascript UDFs) is that fundamentally, I can't run a procedural statement (EXECUTE IMMEDIATE) in a SELECT context, no matter how I try to hack it. Jinja also can't help me because this sql_expr is row-level.

The ONLY way I can conceive of this being possible is to use dbt post-hook to call a BigQuery stored procedure where I could do something ghastly like a DML update that populates this column (presumably added as NULL in my model) based on a combination of procedural stuff and select stuff. Anything like this in a post-hook is a total abuse - but I can't create some new table otherwise it would be missing from dbt's lineage - so I'd have to alter the relation just created by DBT.

Actually, I'm not even sure how I could even achieve this within a stored procedure - I still have the fundamental problem of needing to EXECUTE IMMEDIATE within a SELECT context.

Welcoming any ideas. Please trust that I have a good reason for wanting this - even if the conclusion ends up being "there's no sane way to do this".


Solution

  • I didn't fully answer my question but I did come up with a workaround and will share for posterity!

    Instead of trying to run some per-row dynamic SQL - which is the triad of impossible, a bad idea, and non-performant, I decided that it would be the lesser of two evils to dynamically construct beastly SQL using a jinja macro.

    It works like this:

    • there is a seed file that maps dbt model names and various criteria to kpi names, expressions, aggregation functions (which wrap the expressions - necessary when this macro is used in an aggregate context - determined by aggregate=true macro parameter)
    • a macro is responsible for querying the seed data and producing some brilliantly awful dynamic SQL (snippet posted below)
    • to make use of it, my dbt model simply needs to call the macro in its select statement

    Macro:

    {% macro add_kpis_column(aggregate=true) -%}
    
    {%- set sql_statement -%}
      WITH `kpi_seed_sql_expression` AS (
        SELECT * EXCEPT(`sql_expression`),
          CONCAT(
            "WHEN ",
            IFNULL(
              NULLIF(
                ARRAY_TO_STRING([
                  IF(`APClient` IS NOT NULL, CONCAT("`APClient` = '", `APClient`, "'"), NULL),
                  IF(`parent_kpi_type` IS NOT NULL, CONCAT("`parent_kpi_type` = '", `parent_kpi_type`, "'"), NULL),
                  IF(`parent_campaign_id` IS NOT NULL, CONCAT("`parent_campaign_id` = ", `parent_campaign_id`), NULL)
                ], " AND "),
                ""
              ),
              "TRUE"
            ),
            " THEN ",
            {{ "CONCAT(`sql_aggregation_function`, '(', `sql_expression`, ')')" if aggregate else "`sql_expression`" }}
          ) AS `sql_expression`
        FROM {{ ref("kpis") }}
        WHERE `table_name` = "{{ this.identifier }}"
      ),
    
      `kpi_name_sql_expression` AS (
        SELECT
          `kpi_name`,
          CONCAT(
            "CASE ",
            STRING_AGG(`sql_expression`, "\n" ORDER BY `APClient` DESC, `parent_kpi_type` DESC, `parent_campaign_id` DESC),
            " END AS `",
            `kpi_name`,
            "`"
          ) AS `sql_expression`
        FROM `kpi_seed_sql_expression`
        GROUP BY `kpi_name`
      )
    
      SELECT CONCAT(
        "TO_JSON(STRUCT(",
        STRING_AGG(`sql_expression`, ", "),
        "))"
      ) AS `sql_expression`
      FROM `kpi_name_sql_expression`
    
    {%- endset -%}
    
    {%- if execute %}
        {%- set result = run_query(sql_statement) -%}
    {% else %}
        {%- set result = [[]] -%}
    {% endif -%}
    
    {{ result[0][0] }} AS `kpis`
    
    {%- endmacro %}
    

    Example usage from dbt model:

    SELECT *, {{ add_kpi_columns(aggregate=false) }}
    FROM ...
    

    This has effect of adding a single column called kpis, whose value is a JSON that maps kpi names to values.

    Thinking about it, this doesn't need to be a JSON, it could be a struct. Anyway, there you have it.