I am trying to Stream Salesforce data to Google Cloud Bigquery. Managed to implement the Insert stream, for eg: whenever a new Lead got created in SF, the same will be inserted to Biguery tables. checking on, is there a way i can go for Upsert of data. I knew that there is a streaming Buffer which wont allow to do any DML operation on the inserted data since those will be on streaming buffer for a short duration.
Really Appreciate any hints on Upsert part
Edited - June6th 2019
Using below cloud function for Inserting record
/**
* Responds to any HTTP request.
*
* @param {!express:Request} req HTTP request context.
* @param {!express:Response} res HTTP response context.
*/
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "HTTP";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
};
Code which worked for Update
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
var j = JSON.parse(i);
var k = JSON.stringify(j.Id);
var id = k.replace(/"/g, '');
console.log(k);
console.log(id);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "LEADS_STG";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
/*const bigqueryClient = new BigQuery();*/
var delayInMilliseconds = 1000;
setTimeout(function() {
bigquery.query({
query: [
'MERGE DEMO.LEADS_D T USING (SELECT ID,NAME,LEADSOURCE,COMPANY FROM DEMO.LEADS_STG where ID= ? order by LASTMODIFIEDDATE DESC LIMIT 1) S ON T.ID = S.ID WHEN MATCHED THEN UPDATE SET NAME = S.NAME, LEADSOURCE = S.LEADSOURCE, COMPANY = S.COMPANY WHEN NOT MATCHED THEN INSERT ( ID, NAME, LEADSOURCE, COMPANY) VALUES( ID, NAME,LEADSOURCE,COMPANY)'
].join(' '),
params: [
id
]
}, function(err, rows) {});
}, delayInMilliseconds);
};