Search code examples
automationgoogle-bigquerydbt

Utilizing macro to drop old relations no longer in dbt on multiple schemas on bigquery


Source and inspiration here: Clean your warehouse of old and deprecated models

Attempting to convert the following dbt macro to do two things:

  1. Run on bigquery
  2. drop "old" relations from my multiple target schemas

Output of my schemata query is as follows:

SELECT schema_name FROM `my-project.region-us.INFORMATION_SCHEMA.SCHEMATA` order by schema_name desc;

schema_name
dbt_dev
dbt_dev_stage
dbt_dev_mart
dbt_dev_analytics
dbt_prod
dbt_prod_stage
dbt_prod_mart
dbt_prod_analytics
etc...

and my "adjusted" macro is something like:

{% macro drop_old_relations(dryrun=False) %}

{% if execute %}
  {% set current_models=[] %}

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}

  {% endfor %}
{% endif %}

{% set cleanup_query %}

      WITH MODELS_TO_DROP AS (
          SELECT
            CASE 
              WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
              WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
            END AS RELATION_TYPE,
            CONCAT( TABLE_CATALOG,".",{{ target.schema }},".", TABLE_NAME) AS RELATION_NAME
          FROM 
            {{ target.database }}.{{ target.schema }}.INFORMATION_SCHEMA.TABLES
          WHERE TABLE_SCHEMA = {{ target.schema }}
            AND TABLE_NAME NOT IN
              ({%- for model in current_models -%}
                  '{{ model.upper() }}'
                  {%- if not loop.last -%}
                      ,
                  {% endif %}
              {%- endfor -%})) 
      SELECT 
        'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
      FROM 
        MODELS_TO_DROP
  {% endset %}

{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}

{% if drop_commands %}
  {% if dryrun | as_bool == False %}
    {% do log('Executing DROP commands...', True) %}
  {% else %}
    {% do log('Printing DROP commands...', True) %}
  {% endif %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dryrun | as_bool == False %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}

{%- endmacro -%}

I am currently running into issues where the macro either doesn't recognize some of my target schemas:

dbt run-operation drop_old_relations --args "{dryrun: True}"

Encountered an error while running operation: Database Error
  Unrecognized name: dbt_dev at [14:32]

Or I'd be happy to go the route of something like a schema argument and then iterate over schemas as a run-hook like:

dbt run-operation drop_old_relations --args "{schema: dbt_dev_mart, dryrun: True}"

on-run-start:
 - "{% for schema in schemas%}drop_old_relations({{ schema }},False);{% endfor%}"

Solution

  • Great question, I love seeing people adapt this different warehouses. I'm breaking up my response into two parts per your question.

    1. Getting this to work on BigQuery I was able to get this working on solely my target schema first. It looks like we just needed to adjust some of the SQL in the cleanup_query.

    drop_old_relations.sql

    {% macro drop_old_relations(schema=target.schema, dryrun=False) %}
    
    {# Get the models that currently exist in dbt #}
    {% if execute %}
      {% set current_models=[] %}
    
      {% for node in graph.nodes.values()
         | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
            {% do current_models.append(node.name) %}
        
      {% endfor %}
    {% endif %}
    
    {# Run a query to create the drop statements for all relations in BQ that are NOT in the dbt project #}
    {% set cleanup_query %}
    
          WITH MODELS_TO_DROP AS (
              SELECT
                CASE 
                  WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
                  WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
                END AS RELATION_TYPE,
                CONCAT('{{ schema }}','.',TABLE_NAME) AS RELATION_NAME
              FROM 
                {{ schema }}.INFORMATION_SCHEMA.TABLES
              WHERE TABLE_SCHEMA = '{{ schema }}'
                AND UPPER(TABLE_NAME) NOT IN
                  ({%- for model in current_models -%}
                      '{{ model.upper() }}'
                      {%- if not loop.last -%}
                          ,
                      {% endif %}
                  {%- endfor -%})) 
          SELECT 
            'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
          FROM 
            MODELS_TO_DROP
      {% endset %}
    
    {% set drop_commands = run_query(cleanup_query).columns[0].values() %}
    
    {# Execute each of the drop commands for each relation #}
    
    {% if drop_commands %}
      {% if dryrun | as_bool == False %}
        {% do log('Executing DROP commands...', True) %}
      {% else %}
        {% do log('Printing DROP commands...', True) %}
      {% endif %}
      {% for drop_command in drop_commands %}
        {% do log(drop_command, True) %}
        {% if dryrun | as_bool == False %}
          {% do run_query(drop_command) %}
        {% endif %}
      {% endfor %}
    {% else %}
      {% do log('No relations to clean.', True) %}
    {% endif %}
    
    {%- endmacro -%}
    

    2. Cleaning Multiple Schemas Rather than adjusting the macro above to clean all schemas, I thought it might be helpful to instead use another macro to get all the schemas and then loop through each schema and call the macro above.

    clean_all_schemas.sql

    {% macro clean_all_schemas() %}
    
      {% set get_schemas_query %}
          SELECT schema_name FROM `{{ target.project }}.region-{{ target.location }}.INFORMATION_SCHEMA.SCHEMATA` order by schema_name desc;
      {% endset %}
    
      {% set schemas = run_query(get_schemas_query).columns[0].values() %}
    
      {% for schema in schemas %}
        {% do log("Cleaning up " + schema + " schema", True) %}
        {{ drop_old_relations(schema=schema) }}
      {% endfor %}
    
    {% endmacro %}%}
    

    Commands

    • dbt run-operation drop_old_relations - Drop old relations in target schema
    • dbt run-operation clean_all_schemas - Drop old relations across schemas in target BQ project.

    Let us know if that works for your use case :-)