We are using protocol buffers (version 3) for our microservices communications.
We now want to start ingesting those messages into our data storage via sending messages to Google's Pub/Sub.
As best as I can figure out, Pub/Sub requires a proto file with:
This is a problem, since we have already created many complex protobuf objects and .proto
files, and we never followed those two rules in the protobuf definitions.
Let me explain by the following example of our problem:
syntax = "proto3";
package ourprotobuf.order.v1;
import "buf/validate/validate.proto";
import "ourprotobuf/common/amount/v1/amount.proto";
option java_multiple_files = true;
option java_package = "com.us.ourprotobuf.order.v1";
message Order {
optional string id = 1;
optional string merchant_order_reference = 2 [(buf.validate.field).required = true];
TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
}
message TaxesAndFees {
repeated Section sections = 1;
}
message Section {
// NOTE: not required since it can be empty
repeated SectionRow rows = 2;
SectionTotalRow total_row = 3 [(buf.validate.field).required = true];
}
message SectionRow {
optional string title = 1 [(buf.validate.field).required = true];
common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
optional string amount_descriptor = 3;
}
message SectionTotalRow {
optional string title = 1 [(buf.validate.field).required = true];
common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
}
The example .proto
file is perfectly acceptable when working purely with protocol buffers as generated into the target languages. (We use a different target language for the data reporting team vs the application team, without any problems.)
But as far as I can determine, this is a problem when using Google's Pub/Sub because:
"ourprotobuf/common/amount/v1/amount.proto"
, which is a shared protobuf message being referenced.Order
, TaxesAndFees
, etc. -- that are defined in a "hierarchical" format, but not all within other messages. It is clear to us, as humans, that Order
is the "main" message that I would like for Pub/Sub to recognize, but I do not know how to make that clear to Pub/Sub. I want for Pub/Sub to know that, for instance, it should never be expecting a message of type SectionRow
directly, but that SectionRow
is a sub-message of Order
.So far, we are using this package to try to assist us in generating a proper schema for Pub/Sub. But it doesn't seem to be able to fully generate a single message that incorporates internal modules in the package, as far as I can manage.
But I feel like I'm making this more complicated than it needs to be, because:
Order
object in Python that is completely correct, by importing the generated order_pb2.py
file..proto
file message from an object? In other words, isn't there something similar to:> my_obj = order_pb2.Order()
> my_obj.exportSchema('./order.proto')
where the generated file order.proto
would have the full schema of the object in a single message? In other words, the exportSchema
method would do the reverse of what protoc
does.
It seems this must be possible. The full structure of the object is known from the Python object.
The on-the-wire messages generated by a tweaked (see below) copy of your protobuf sources and the output from protoc-gen-pubsub-schema
are identical for me using Go and a tweaked version of your Order
message.
Since the on-the-wire messages are identical, the schema, are equivalent.
There is (at least) one issue: protoc-gen-pubsub-schema
does not support optional
fields (these were only recently added) to proto3
.
Once this change has been made and after suitable testing, it appears you could use protoc-gen-pubsub-schema
to programmatically generate schemas suitable for Cloud Pub/Sub from your existing schema.
Proof:
protoc-gen-pub-schema
requires a single top-level message in a protobuf source. This is because it is a plugin for protoc
and there's no way to specify a single message when compiling protos.Solution: create e.g. order.only.proto
:
syntax = "proto3";
package ourprotobuf.order.v1;
import "buf/validate/validate.proto";
import "ourprotobuf/order/v1/order.proto";
option java_multiple_files = true;
option java_package = "com.us.ourprotobuf.order.v1";
option go_package = "{module}/protos";
message Order {
string id = 1;
string merchant_order_reference = 2 [(buf.validate.field).required = true];
ourprotobuf.order.v1.TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
}
And revise order.proto
to exclude Order
:
syntax = "proto3";
package ourprotobuf.order.v1;
import "buf/validate/validate.proto";
import "ourprotobuf/common/amount/v1/amount.proto";
option java_multiple_files = true;
option java_package = "com.us.ourprotobuf.order.v1";
option go_package = "{module}/protos";
// message Order {
// optional string id = 1;
// optional string merchant_order_reference = 2 [(buf.validate.field).required = true];
// TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
// }
message TaxesAndFees {
repeated Section sections = 1;
}
message Section {
// NOTE: not required since it can be empty
repeated SectionRow rows = 2;
SectionTotalRow total_row = 3 [(buf.validate.field).required = true];
}
message SectionRow {
optional string title = 1 [(buf.validate.field).required = true];
ourprotobuf.common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
optional string amount_descriptor = 3;
}
message SectionTotalRow {
optional string title = 1 [(buf.validate.field).required = true];
ourprotobuf.common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
}
NOTE I'm using Go rather than Java and Go requires
option go_package ...
protoc \
--proto_path=${PWD}/protos \
--go_out=${PWD} \
--go_opt=module=${MODULE} \
${PWD}/protos/ourprotobuf/order/v1/order.only.proto \
${PWD}/protos/ourprotobuf/order/v1/order.proto \
${PWD}/protos/ourprotobuf/common/amount/v1/amount.proto
NOTE Absent your definition of
ourprotobuf.common.amount.v1.Amount
, I create a stubamount.proto
package main
import (
"fmt"
"log/slog"
"google.golang.org/protobuf/proto"
pb "{module}/protos"
)
func pString(s string) *string {
return &s
}
func main() {
order := &pb.Order{
Id: "ID",
MerchantOrderReference: "MerchantOrderRef",
TaxesAndFees: &pb.TaxesAndFees{
Sections: []*pb.Section{
{
Rows: []*pb.SectionRow{{
Title: pString("Title"),
Amount: &pb.Amount{
Title: pString("Title"),
},
}},
TotalRow: &pb.SectionTotalRow{
Title: pString("Title"),
Amount: &pb.Amount{
Title: pString("Title"),
},
},
},
},
},
}
slog.Info("order", "order", order)
b, err := proto.Marshal(order)
if err != nil {
slog.Error("unable to marshal order",
"error", err,
)
}
slog.Info("marshaled message",
"order", fmt.Sprintf("%x", b),
)
}
Yields:
2023/12/04 15:19:26 INFO order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
2023/12/04 15:19:26 INFO marshaled message order=0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65
The on-the-wire message is 0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65
Use e.g. Protobuf Decoder with this message.
Generate Pub/Sub-equivalent (!) schema using protoc-gen-pubsub-schema
--proto_path=${PWD}/protos \
--pubsub-schema_out=${PWD}/pubsub \
--pubsub-schema_opt=message-encoding-json \
--pubsub-schema_opt=schema-syntax=proto3 \
${PWD}/protos/ourprotobuf/order/v1/order.only.proto
NOTE To avoid naming conflicts, these Go sources are generated to
pubsub
folder (package) rather thanprotos
.
# Need to include `option go_package` in generated proto
sed \
--in-place \
--expression="s|package ourprotobuf.order.v1;|package ourprotobuf.order.v1;\n\noption go_package = \"${MODULE}/pubsub\";|g" \
${PWD}/pubsub/ourprotobuf/order/v1/order.only.pps
# Compile pubsub/ourprotobuf/order/v1/order.only.pps
# Generate Go sources in pubsub folder
protoc \
--proto_path=${PWD}/pubsub \
--go_out=${PWD} \
--go_opt=module=${MODULE} \
${PWD}/pubsub/ourprotobuf/order/v1/order.only.pps
package main
import (
"fmt"
"log/slog"
"google.golang.org/protobuf/proto"
pb "{module}/pubsub"
)
var (
// The on-the-wire message generated using the tweaked original schema
b = []byte("0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65")
)
func pString(s string) *string {
return &s
}
func main() {
order := &pb.Order{
Id: "ID",
MerchantOrderReference: "MerchantOrderRef",
TaxesAndFees: &pb.Order_OurprotobufOrderV1TaxesAndFees{
Sections: []*pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section{
{
Rows: []*pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionRow{{
Title: "Title",
Amount: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionRow_OurprotobufCommonAmountV1Amount{
Title: "Title",
},
}},
TotalRow: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionTotalRow{
Title: "Title",
Amount: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionTotalRow_OurprotobufCommonAmountV1Amount{
Title: "Title",
},
},
},
},
},
}
slog.Info("order", "order", order)
b, err := proto.Marshal(order)
if err != nil {
slog.Error("unable to marshal order",
"error", err,
)
}
slog.Info("marshaled message",
"order", fmt.Sprintf("%x", b),
)
psOrder := &pb.Order{}
if err := proto.Unmarshal(b, psOrder); err != nil {
slog.Error("unable to unmarshal order into Pub/Sub message",
"error", err,
)
}
slog.Info("Pub/Sub order", "order", psOrder)
}
Yields:
2023/12/04 15:26:09 INFO order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
2023/12/04 15:26:09 INFO marshaled message order=0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65
2023/12/04 15:26:09 INFO Pub/Sub order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
NOTE This code works; the Pub/Sub-compatible schema is able to unmarshal the message generated by the tweaked original schema; (redundantly but for completeness) the Pub/Sub-compatible schema is also able to generate the same on-the-wire message