I have the following issue:
I have managed to do this by creating a storage integration and I have manually created with my role (used for running dbt) a schema and assing usage on that schema. So far so good.
Then I read about this:
https://github.com/fishtown-analytics/dbt-external-tables
Problem is that this is the only way this runs properly, I had to alter my dbt profiles.yml, set the default schema to be S3_MIXPANEL with default database RAW_DEV, run a different target and role on that with --target 'ingest_dev' parameter.
I keep thinking that there should be a more sophisticated solution, where I can create schema's and query metadata and use something like {{ source() }} so I can point my documentation somehow that this is an external source. This dbt-external-tables is not really well explained for my case here I think?
Please can anyone help me and share how to create schemas and query from external stages properly without changing default schema macro & dbtprofiles.yml each time?
I have succeeded to run the following code:
{{
config(
materialized ='incremental',
schema = generate_schema_name('S3_MIXPANEL')
)
}}
SELECT
metadata$filename as file_name,
to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
$1 as payload,
CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at
from
@my_s3_stage
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date>(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
EDIT 22-06-20:
I have added the src_mixpanel.yml file in my models and ran the dbt command, however I had to also specify the data_types, so I added them too, then I apparently had to add the "macro" in my macros too (btw maybe a stupid question but I don't really know how to install your package, so I manually added all macros from yours into mine).
Now when I run this code:
dbt run-operation stage_external_sources
with
version: 2
sources:
- name: s3_mixpanel
database: RAW_DEV
tables:
- name: events
external:
location: '@my_s3_stage'
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
data_type: date
- name: file_name
expression: metadata$filename
data_type: string
columns:
- name: properties
data_type: variant
I get an error:
Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'
As the maintainer of the dbt-external-tables
package, I'll share its opinionated view. The package believes that you should stage all external sources (S3 files) as external tables or with snowpipes first, in a process that includes as little confounding logic as possible. Then you can select from them, as sources, in dbt models, alongside all requisite business logic.
If my understanding is correct, you would stage your mixpanel data as below, in a file called (e.g.) models/staging/mixpanel/src_mixpanel.yml:
version: 2
sources:
- name: s3_mixpanel
database: raw_dev
tables:
- name: events
external:
location: '@my_s3_stage'
file_format: "( type = json )" # or a named file format
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
columns:
- name: properties
data_type: variant
You would run this macro from the package to create the external table—and, after creation, to update its partition metadata if you don't have auto_refresh
enabled (see Snowflake docs):
dbt run-operation stage_external_sources
You can then select from this source in an incremental model, like the one you have above. Now, event_date
is a partition column on this external table, so filtering on it should enable Snowflake to prune files (though that's been inconsistent historically for dynamic, subquery-derived filters).
{{
config(
materialized ='incremental'
)
}}
SELECT
metadata$filename as file_name,
event_date,
value as payload,
properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at
from {{ source('s3_mixpanel', 'events' }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date >(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}