Search code examples
google-cloud-platformstreaming

Streaming Insert/Update in Google Cloud - BigQuery


I am trying to Stream Salesforce data to Google Cloud Bigquery. Managed to implement the Insert stream, for eg: whenever a new Lead got created in SF, the same will be inserted to Biguery tables. checking on, is there a way i can go for Upsert of data. I knew that there is a streaming Buffer which wont allow to do any DML operation on the inserted data since those will be on streaming buffer for a short duration.

Really Appreciate any hints on Upsert part

Edited - June6th 2019

Using below cloud function for Inserting record

    /**
     * Responds to any HTTP request.
     *
     * @param {!express:Request} req HTTP request context.
     * @param {!express:Response} res HTTP response context.
     */
    exports.helloWorld = (req, res) => {
    let message = req.query.mes || req.body.mes || 'Hello World!';
    res.status(200).send(req.body);
    var d =JSON.stringify(req.body);
      console.log(d);
    var e = d.replace(/:""/g, '');
  var f = e.replace(/\\/g, '');
  var g = f.replace(/"{n /g, '');
  var h = g.replace(/n}"/g, '');
  var i = h.replace(/n /g, '');
  console.log(i);
  const {BigQuery} = require('@google-cloud/bigquery');
  const bigquery = new BigQuery();
  var instance = "DEMO";
  var table = "HTTP";
  bigquery
    .dataset(instance)
    .table(table)
    .insert(JSON.parse(i),
    {'ignoreUnknownValues':true, 'raw':false})
    .then ((data) => {
      console.log('Inserted 1 rows');
      console.log(data);
    })
    };

Solution

  • Code which worked for Update

    exports.helloWorld = (req, res) => {
      let message = req.query.mes || req.body.mes || 'Hello World!';
      res.status(200).send(req.body);
      var d =JSON.stringify(req.body);
      console.log(d);
      var e = d.replace(/:""/g, '');
      var f = e.replace(/\\/g, '');
      var g = f.replace(/"{n /g, '');
      var h = g.replace(/n}"/g, '');
      var i = h.replace(/n /g, '');
      console.log(i);
      var j = JSON.parse(i);
      var k = JSON.stringify(j.Id);
      var id = k.replace(/"/g, '');
      console.log(k);
      console.log(id);
      const {BigQuery} = require('@google-cloud/bigquery');
      const bigquery = new BigQuery();
      var instance = "DEMO";
      var table = "LEADS_STG";
      bigquery
        .dataset(instance)
        .table(table)
        .insert(JSON.parse(i),
        {'ignoreUnknownValues':true, 'raw':false})
        .then ((data) => {
          console.log('Inserted 1 rows');
          console.log(data);
        })
      /*const bigqueryClient = new BigQuery();*/
      var delayInMilliseconds = 1000;
      setTimeout(function() {
      bigquery.query({
      query: [
        'MERGE DEMO.LEADS_D T USING  (SELECT ID,NAME,LEADSOURCE,COMPANY FROM DEMO.LEADS_STG where ID= ? order by LASTMODIFIEDDATE DESC LIMIT 1) S ON  T.ID = S.ID  WHEN MATCHED  THEN  UPDATE   SET  NAME = S.NAME,  LEADSOURCE = S.LEADSOURCE,  COMPANY = S.COMPANY  WHEN NOT MATCHED  THEN INSERT  ( ID,    NAME,    LEADSOURCE,    COMPANY) VALUES( ID, NAME,LEADSOURCE,COMPANY)'  
      ].join(' '),
      params: [
        id
      ]
    }, function(err, rows) {});
        }, delayInMilliseconds);
    };