I am building a data pipeline that ingests data from several schemas. I am trying to use jinja to ingest a list of schema names, and then queries specific columns in a table (with the identical name) in all these schemas before combining data into a single staging table.
On a conceptual level, I know that I need to create two lists with Jinja: a list with the schema
names that I want to loop over, and a list of the column names that I want to query in the table that exists in all of schemas I am querying. Let's call the list of schema names schema_names
and the list of column names column_names
.
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% set var column_names=('col1', 'col2', 'col3') %}
Theoretically, the next step seems to be that I should pass these values via Jinja into a query that selects the same list of columns from tables within the schemas in schema_vars
. Were I to write the SQL query manually, I would use several WITH
statements that were then combined using a UNION ALL
:
/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
AS (
SELECT 'brand1' AS schema_name,
col1,
col2,
col3
FROM brand1.table_name
),
/* Query that creates a variable with value `brand2` while querying `brand2` schema */
schema2_data
AS (
SELECT 'brand2' AS schema_name,
col1,
col2,
col3
FROM brand2.table_name
),
/* Query that creates a variable with value `brand3` while querying `brand3` schema */
schema3_data
AS (
SELECT 'brand3' AS schema_name,
col1,
col2,
col3
FROM brand3.table_name
),
/* Union statement combining identical tables from 3 schemas */
combined_schemas
AS (
SELECT *
FROM schema1_data
UNION ALL
(
SELECT *
FROM schema2_data)
UNION ALL
(
SELECT *
FROM schema3_data)
)
SELECT *
FROM combined_schemas
Thoughts on how this would be best implemented in Jinja/dbt? Also open to alternative framings of the problem that take a different approach to combine information from identical tables in different schemas into a single staging table to be processed by a data pipeline.
Your UNION ALL approach sounds good. You can simplify your plan a little bit: if the columns you want from each models are the same, then you don't need the column_names
variable. Also, using CTEs only helps if you would write this manually, but with a jinja template, it actually makes your life harder so do not force yourself to use them.
You will need:
jinja
for loop which is documented here.loop.last
special variable to access within the for loop blockI have not tested the following code but here is how I would write this:
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% for schema in schema_names %}
(
select
'{{ schema }}' as schema_name,
col1,
col2,
col3
from {{ schema }}.table_name
)
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
If you don't want to code this yourself, dbt_utils
has this functionality already implemented: union_relations
.
Although, it will be interesting for you to reference against their implementation (more complicated than what you need because they handle more corner cases) which you can find here.