Search code examples
node.jsgoogle-bigquerygoogle-cloud-functionsgoogle-cloud-pubsubdml

How to update data in the BigQuery stream buffer?


My Workflow

  • Data gets streamed to BigQuery from Pub/Sub using cloud function.

  • Data stays in Stream buffer for 90 minutes therefor I cannot do an Update statement.

  • I need to update the Result column before that time.

    please help.

I receive data in "Pub/Sub" then a "Cloud functions" is triggered which inserts the data inside "BigQuery"

This is the code:

const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();


exports.telemetryToBigQuery = (data, context) => {

  if (!data.data) {
    throw new Error('No telemetry data was provided!');
    return;
  }

  //Data comes in as base64
  console.log(`raw data: ${data.data}`);

  //Data gets decoded from base64 to string
  const dataDataDecode = Buffer.from(data.data, 'base64').toString();


var indexesSemicolons = [];

for (var i = 0; i < dataDataDecode.length; i++) {
    if (dataDataDecode[i] === ";") {
        indexesSemicolons.push(i);
    }
}

if (indexesSemicolons.length == 14) {

     const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
     const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
     const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);

    async function insertRowsAsStream() {
      // Inserts the JSON objects into my_dataset:my_table.

    
      const datasetId = 'put your dataset here';
      const tableId = 'put table id here';
      const rows = [
        {
          Brand: brand,
          Model: model,
          Result: result

        }
      ];

      // Insert data into a table
      await bigquery
        .dataset(datasetId)
        .table(tableId)
        .insert(rows);
      console.log(`Inserted ${rows.length} rows`);
    }
    insertRowsAsStream();
  } else {
    console.log("Invalid message");
    return;
  }
}

This data stays in the BigQuery stream buffer for about 90 minutes, But I need to need to execute an update query which changes the Result column. This is not allowed and causes an error

ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError

I need a way to update the Result before the 90 minute buffer time. Can you guys help me please.

I read the following pages online

Life of a BigQuery stream

I read the answer from the following question I think I understand the idea of what he is talking about but I don't know how to execute it.

If I am correct he is saying to stream my data to a temporary table and from there put it into a permanent table.

Stackoverflow DML Update bigQuery


Solution

  • I am now using the BigQuery Create a job methode.

    the example found here

    I directly puts the data inside the table so I don't have to wait 90 minutes for the streaming buffer.