Search code examples
google-bigquerygoogle-cloud-pubsubgoogle-cloud-data-fusion

pubsub message ingestion into Bigquery using data fusion


I've built a simple realtime pipeline to receive messages and attributes from pubsub subscription and wrangle them to keep only a few fields and load it to a BigQuery table. when deployed and run, the pipeline log says Importing into table '<tablename>' from 0 paths; path[0] is '(empty)'; awaitCompletion: true I'm unable to understand why 0 paths and why all the records are going to errors when an error collector was setup. Is there a way to debug the wrangler stage better? sample wrangler directives as below:

keep message,attributes
set-charset :message 'utf-8'
set-type :attributes string
parse-as-json :attributes 1
parse-as-json :message 5
keep attributes_page_url,attributes_cart_remove,attributes_page_title,attributes_transaction_complete,message_event_id,message_data_dom_domain,message_data_dom_title,message_data_dom_pathname,message_data_udo_ut_visitor_id
columns-replace s/^attributes_//g
columns-replace s/^message_//g

Any help is appreciated.Thanks


Solution

  • The reason you see that there are 0 paths to load is that all records are causing errors during wrangling.

    There are 2 ways in which you can capture these errors:

    1. Configure the Wrangler stage to Fail Pipeline on error. This will show the exception/error in the logs.
    2. Attach the Error output from the Wrangler stage to an Error Collector, and store the output in a File or GCS Sink. This allows you to capture the error message for each row. Configure the Error Collector as follows:
      1. Error Message Column Name = errorMsg
      2. Error Code Column Name = errorCode
      3. Error Emitter Node Name = invalidRecord