If you are reading this, I hope you are familiar with Google Cloud PubSub, PubSub Topics and Schemas for those Topics. :).
When defining a schema for a topic is GC pubsub, you have two choices for syntax - AVRO and Protocol Buffer. I've been successful at using AVRO but when trying to use Protocol Buffer, I'm getting an error that I'm not sure how to fix.
Here's the schema in proto2 syntax:
syntax = "proto2";
message ProtocolBuffer {
string event_name = 1;
string user_id = 2;
}
That's pretty close to what you get out of the box when starting to create a schema in GC pubsub and picking Protocol Buffer and I assume that GC pubsub doesn't like proto3 format since it defaults to proto2.
I have a NodeJS based Cloud Function that is invoked when a document is created in GC Firestore. My goal is to get the data from Firestore and into BigQuery.
Here's my code for the Cloud Function:
const Firestore = require('@google-cloud/firestore');
const { PubSub } = require('@google-cloud/pubsub');
const protobuf = require('protobufjs');
const firestore = new Firestore();
const pubsub = new PubSub();
exports.publishToBigQuery = async (event, context) => {
console.log("event", JSON.stringify(event, 2, null));
console.log("context", JSON.stringify(context, 2, null));
const affectedDoc = firestore.doc(`messages/${context.params.documentId}`);
try {
const documentSnapshot = await affectedDoc.get();
if (documentSnapshot.exists) {
const firestoreData = documentSnapshot.data();
const topic = pubsub.topic('firestore-document-created-with-proto-schema');
const schema = pubsub.schema('event-pb-bq');
const info = await schema.get();
console.log('info', info);
let root = new protobuf.Root();
const type = protobuf.parse(info.definition);
console.log('type', type);
const ProtocolBuffer = type.root.lookupType('ProtocolBuffer');
console.log('ProtocolBuffer', ProtocolBuffer);
const message = ProtocolBuffer.create(firestoreData);
console.log('message', message);
const data = Buffer.from(message.toJSON());
console.log('data', data);
const value = await topic.publishMessage({data});
console.log("Message published", value);
} else {
console.log("Document doesn't exist", JSON.stringify(affectedDoc));
}
} catch (error) {
console.error("Error when fetching document", error);
};
};
I get this error on the const type = protobuf.parse(info.definition);
line. I have no idea if the later lines are correct. They are guesses. If the schema source can't be parsed then I'm stuck.
Here's the error stack trace:
event {"oldValue":{},"updateMask":{},"value":{"createTime":"2022-09-09T16:08:59.107887Z","fields":{"event_name":{"stringValue":"fridayeventname"},"user_id":{"stringValue":"fridayuserid"}},"name":"projects/myproject/databases/(default)/documents/messages/jH9W7SQj2aLh7eK8lRCl","updateTime":"2022-09-09T16:08:59.107887Z"}}
context {"eventId":"a00ecca0-0740-4cf8-94bf-15828af8e180-0","eventType":"providers/cloud.firestore/eventTypes/document.create","notSupported":{},"params":{"documentId":"jH9W7SQj2aLh7eK8lRCl"},"resource":"projects/myproject/databases/(default)/documents/messages/jH9W7SQj2aLh7eK8lRCl","timestamp":"2022-09-09T16:08:59.107887Z"}
info {
name: 'projects/myproject/schemas/event-pb-bq',
type: 'PROTOCOL_BUFFER',
definition: 'syntax = "proto2";\n' +
'\n' +
'message ProtocolBuffer {\n' +
' string event_name = 1;\n' +
' string user_id = 2;\n' +
'}\n'
}
Error when fetching document Error: illegal token 'string' (line 4)
at illegal (/workspace/node_modules/protobufjs/src/parse.js:96:16)
at parseType_block (/workspace/node_modules/protobufjs/src/parse.js:347:31)
at ifBlock (/workspace/node_modules/protobufjs/src/parse.js:290:17)
at parseType (/workspace/node_modules/protobufjs/src/parse.js:308:9)
at parseCommon (/workspace/node_modules/protobufjs/src/parse.js:261:17)
at Object.parse (/workspace/node_modules/protobufjs/src/parse.js:829:21)
at exports.publishToBigQuery (/workspace/index.js:26:35)
I couldn't find an example anywhere that would retrieve the schema source from PubSub and then use that to format the pubsub message. Anyone have any ideas?
Thanks.
The issue is that Pub/Sub's schema validation is too permissive. In this case, the schema definition provided is not considered valid because it is proto2 and does not have optional, repeated, or required specified for the fields. The protobuf parser for Node is catching this fact while Pub/Sub's validator is implicitly treating these as optional.
If you change the schema to the following, it should work:
syntax = "proto2";
message ProtocolBuffer {
optional string event_name = 1;
optional string user_id = 2;
}
For follow-up on improvements to the validator in this case, you can see the issue entered for it.