Search code examples
sqlgoogle-bigquerygoogle-analytics-4

How to handle new fields in a STRUCT when dynamically adding using union on all tables in BigQuery


Process

In BigQuery, Google Analytics data is linked to a BigQuery project. Each site corresponds with a dataset, and each day an events_* table is added to that dataset. My query dynamically iterates over all datasets and tables, performs a UNION of them all updating when new event_* tables are added, daily, to a destination table.

Issue

One of the fields in the events_* tables changed a few days ago. The field: collected_traffic_source had 4 new columns added to its STRUCT. Older events_* tables don't have the extra fields which causes this error

Query error: Query column 27 has type STRUCT<manual_campaign_id STRING, manual_campaign_name STRING, manual_source STRING, ...> which cannot be inserted into column collected_traffic_source, which has type STRUCT<manual_campaign_id STRING, manual_campaign_name STRING, manual_source STRING, ...> at [3:3]

I recreated the unioned table with the additional columns, but get the following error due to older tables not having them when I rerun the query.

Query error: Inserted row has wrong column count; Has 28, expected 32 at [3:3]

Question

Is it possible to update the query so it handles based on the number of columns for the collected_traffic_source STRUCT and INSERT based on that?

Query

DECLARE dataset_name STRING;
DECLARE dataset_counter INT64;

-- Get list of datasets matching the pattern 'analytics_xxx'
CREATE TEMP TABLE datasets AS (
  SELECT ROW_NUMBER() OVER() rownumber, schema_name AS dataset_name
  FROM google-analytics-prod.INFORMATION_SCHEMA.SCHEMATA
  WHERE schema_name LIKE 'analytics_%'
);

SET dataset_counter = 1;
-- Iterate through datasets
WHILE dataset_counter <= (SELECT MAX(rownumber) FROM datasets)  DO
  -- Fetch the next dataset name
  SET dataset_name = (SELECT dataset_name FROM datasets WHERE rownumber = dataset_counter);
  --Error occurs here
  EXECUTE IMMEDIATE FORMAT('''
  INSERT INTO bi.production.google_analytics_brandsites
  SELECT "%s" AS dataset_brandsite, _TABLE_SUFFIX AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items, t.collected_traffic_source, t.is_active_user FROM google-analytics-prod.%s.events_* AS t
  WHERE NOT EXISTS(
    SELECT 1
    FROM bi.production.google_analytics_brandsites  AS d
    WHERE t._TABLE_SUFFIX = d.events_tbl_name
    AND
    "%s" = d.dataset_brandsite
  )
  ''', dataset_name, dataset_name, dataset_name)
  ;
  SET dataset_counter = (dataset_counter + 1);
END WHILE;

Solution

  • Using INFORMATION_SCHEMA.COLUMN_FIELD_PATHS, allowed handling of the STRUCT dynamically, utilizing conditionals. This changed the query from dynamically iterating over just the datasets and tables within a project, to also dynamically iterating over each table and its field paths. Finally, determining the number of columns in a field path for the STRUCT and directing to a conditional.

    DECLARE dataset_name STRING;
    DECLARE dataset_counter INT64;
    DECLARE table_name_ STRING;
    DECLARE table_counter INT64;
    DECLARE collected_traffic_source_exists INT64;
    
    -- Get list of datasets matching the pattern 'analytics_xxx'
    CREATE TEMP TABLE datasets AS (
      SELECT ROW_NUMBER() OVER() rownumber, schema_name AS dataset_name
      FROM google-analytics-prod.INFORMATION_SCHEMA.SCHEMATA
      WHERE schema_name LIKE 'analytics_%'
    );
    
    SET dataset_counter = 1;
    -- Iterate through datasets
    WHILE dataset_counter <= (SELECT MAX(rownumber) FROM datasets)  DO
      -- Fetch the next dataset name
      SET dataset_name = (SELECT dataset_name FROM datasets WHERE rownumber = dataset_counter);
      -- Dynamically iterate through a datasets tables
      EXECUTE IMMEDIATE FORMAT("""
      CREATE TEMP TABLE tables_brandsite_31 AS (
      SELECT ROW_NUMBER() OVER() rownumber, table_name
      FROM `google-analytics-prod.%s`.INFORMATION_SCHEMA.TABLES
      WHERE table_name LIKE 'events_%%'
      AND
      table_schema = '%s')
      """, dataset_name, dataset_name)
      ;
      SET table_counter = 1;
      WHILE table_counter <= (SELECT MAX(rownumber) from tables_brandsite_31) DO
        SET table_name_ = (SELECT table_name FROM tables_brandsite_31 WHERE rownumber = table_counter);
        --Determine number of fields within a STRUCT
        EXECUTE IMMEDIATE FORMAT('''
        CREATE TEMP TABLE tables_brandsite_31_fields AS (
        SELECT COUNT(DISTINCT field_path) fieldpath FROM `google-analytics-prod.%s`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
        WHERE table_schema = "%s"
        and
        column_name = 'collected_traffic_source'
        AND
        table_name = "%s")
        ''', dataset_name, dataset_name, table_name_)
        ;
        SET collected_traffic_source_exists = (SELECT fieldpath from tables_brandsite_31_fields
          )
        ;
        -- Conditional for if the expected fields exist
        IF collected_traffic_source_exists = 13 THEN
    
          EXECUTE IMMEDIATE FORMAT('''
          INSERT INTO `analytics.production.google_analytics`
          SELECT "%s" AS dataset_brandsite, "%s" AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items, t.collected_traffic_source, t.is_active_user, t.is_active_user, t.batch_event_index, t.batch_page_id, t.batch_ordering_id FROM `google-analytics-prod.%s.%s` AS t
          WHERE NOT EXISTS(
            SELECT 1
            FROM `analytics.production.google_analytics`  AS d
            WHERE d.events_tbl_name = "%s"
            AND
            "%s" = d.dataset_brandsite
          )
          ''', dataset_name, table_name_, dataset_name, table_name_,table_name_,dataset_name)
         ;ELSE
          --handle the older tables without the field paths of structs by filling with NULL
          EXECUTE IMMEDIATE FORMAT('''
          INSERT INTO `analytics.production.google_analytics`
          SELECT "%s" AS dataset_brandsite, "%s" AS events_tbl_name, PARSE_DATE('%%Y%%m%%d', t.event_date) event_date_formatted,t.event_date, t.event_timestamp, t.event_name, t.event_params, t.event_previous_timestamp, t.event_value_in_usd, t.event_bundle_sequence_id, t.event_server_timestamp_offset, t.user_id, t.user_pseudo_id, t.privacy_info, t.user_properties, t.user_first_touch_timestamp, t.user_ltv, t.device, t.geo, t.app_info, t.traffic_source, t.stream_id, t.platform, t.event_dimensions, t.ecommerce, t.items,
          STRUCT(t.collected_traffic_source.manual_campaign_id,
          t.collected_traffic_source.manual_campaign_name,
          t.collected_traffic_source.manual_source,
          t.collected_traffic_source.manual_content,
          NULL AS manual_source_platform,
          NULL AS manual_creative_format,
          NULL AS manual_marketing_tactic,
          t.collected_traffic_source.gclid,
          t.collected_traffic_source.dclid,
          t.collected_traffic_source.srsltid) AS collected_traffic_source,
          t.is_active_user,
           
          
          FROM `google-analytics-prod.%s.%s` AS t
          WHERE NOT EXISTS(
            SELECT 1
            FROM `analytics.production.google_analytics`  AS d
            WHERE d.events_tbl_name = "%s"
            AND
            d.dataset_brandsite = "%s"
          )
          ''', dataset_name, table_name_, dataset_name, table_name_, table_name_,dataset_name)
                
      ;
      END IF;
      DROP TABLE IF EXISTS tables_brandsite_31;
      DROP TABLE IF EXISTS tables_brandsite_31_fields;
      SET table_counter = (table_counter +1);
      END WHILE;
      SET dataset_counter = (dataset_counter + 1);
    END WHILE;