Search code examples
javascriptsqlgoogle-bigqueryetldataform

How can I add advanced logic to my dataform pipeline?


TLDR

I am hoping to improve the capability of my Dataform pipeline by incorporating a layer of modularity (in the form of JavaScript functions) which detects when the schema for my raw data source undergoes a breaking change, and automatically makes the necessary downstream adjustments across all of the .SQLX scripts in my project that would otherwise need to be done manually.

--

Background Context

For context, I have a data collection script which publishes a high throughput of JSON-encoded raw data into a Pub/Sub topic, which then uses a PubSub-to-BigQuery subscriber to load this raw data into an hourly partitioned table in BQ. This resulting table then serves as the raw data source on which > 20 user-facing data tables are built. To be specific, Dataform will read from this raw table about every hour and then facilitate the necessary transformation to generate / update all of these tables, each according to their .SQLX script containing business logic.

--

Problem

Every once in a while, a breaking schema change will have to be made on the raw table (i.e., field name or type is modified) and it results in having to do lot of manual adjustments on the Dataform side so that query logic is consistent with the new schema, ensuring Dataform does not crash. I would like to minimize the amount of manual work necessary on the dataform side whenever such change is made.

--

Potential Solutions

Currently, whenever the schema is modified, Terraform automatically rebuilds the raw table but with a different name that contains the version number (i.e., table_1-0-0 --> table_2-0-0). I'm hoping to add some JavaScript functions that tell Dataform to process data a certain way based on the version/name of the raw table. It should be done in such way that if a new field is introduced to the raw table, or an existing one is modified, I don't have to go through the 20 different .SQLX files and update the config, how this field is referenced in the query, etc.

I know it seems redundant, but is such thing possible to accomplish? Thank you in advance


Solution

  • A bit late but I want to post an update and provide a solution that I ended up implementing to partially achieve what I wanted.

    I specified two includes/modules:

    • constants.js: Which contained a list of global variables that specify the full BigQuery paths to all hypothetical raw tables (each corresponding to a schema version).
    • modules.js: contained a function which carries out a UNION ALL operation on all outdated and current schema versions. This function is then called in the pre_operations block on my main user-facing table on each incremental run.

    modules.js file:

        // This function is used to select raw data from the current and all previous schemas
    
       function set_latest_schema() {
        return `
          WITH UNIONED AS (
            SELECT * FROM ${constants.raw_4} WHERE DATE(ingestion_time) >= "2023-01-03"
            UNION ALL
            SELECT * FROM ${constants.raw_3} WHERE DATE(server_timestamp) >= "2023-01-01"
            UNION ALL
            SELECT * FROM ${constants.raw_2} WHERE DATE(server_timestamp) >= "2023-01-01"
            UNION ALL
            SELECT * FROM ${constants.raw_1_1} WHERE DATE(server_timestamp) >= "2023-01-01"
            UNION ALL
            SELECT * FROM ${constants.raw_1} WHERE DATE(server_timestamp) >= "2023-01-01"
          ),
        `
    }
    

    constants.js file:

    // list of raw tables to source from (newest - to - oldest in descending order) 
    const raw_4 = '`project_id.dataset_id.table_id_5`';
    const raw_3 = '`project_id.dataset_id.table_id_4`';
    const raw_2 = '`project_id.dataset_id.table_id_3`';
    const raw_1_1 = '`project_id.dataset_id.table_id_2`';
    const raw_1 = '`project_id.dataset_id.table_id_1`';
    

    In the event of a schema change, a new raw table is constructed, a new constant is added pointing to the new respective table, a new line is added to the UNION ALL statement, and a Pub/Sub message is automatically sent which triggers a cloud function that deletes existing user-facing tables, at which point my orchestration layer (in Cloud Workflows) begins triggering frequent executions to rebuild my updated user-facing tables. This is not a perfect solution, since some changes still have to be done manually in the query logic to account for the new schema.

    In addition, I have now realized that schema changes as a result of new fields can become a lot more simple and the amount of manual labour can be drastically reduced through the use of a repeated record wherein custom key-value pairs (which represent new fields) can be added to the raw table without having to rebuild the raw table. So one less step to worry about whenever breaking changes occur.