Search code examples
protocol-buffersgoogle-cloud-pubsub

Create single .proto file to create message for pub/sub


We are using protocol buffers (version 3) for our microservices communications.

We now want to start ingesting those messages into our data storage via sending messages to Google's Pub/Sub.

As best as I can figure out, Pub/Sub requires a proto file with:

  • only one message defined
  • that message fully encapsulated in the schema that is passed

This is a problem, since we have already created many complex protobuf objects and .proto files, and we never followed those two rules in the protobuf definitions.


Let me explain by the following example of our problem:

syntax = "proto3";

package ourprotobuf.order.v1;

import "buf/validate/validate.proto";
import "ourprotobuf/common/amount/v1/amount.proto";

option java_multiple_files = true;
option java_package = "com.us.ourprotobuf.order.v1";

message Order {
  optional string id = 1;
  optional string merchant_order_reference = 2 [(buf.validate.field).required = true];
  TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
}

message TaxesAndFees {
  repeated Section sections = 1;
}

message Section {
  // NOTE: not required since it can be empty
  repeated SectionRow rows = 2;
  SectionTotalRow total_row = 3 [(buf.validate.field).required = true];
}

message SectionRow {
  optional string title = 1 [(buf.validate.field).required = true];
  common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
  optional string amount_descriptor = 3;
}

message SectionTotalRow {
  optional string title = 1 [(buf.validate.field).required = true];
  common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
}

The example .proto file is perfectly acceptable when working purely with protocol buffers as generated into the target languages. (We use a different target language for the data reporting team vs the application team, without any problems.)

But as far as I can determine, this is a problem when using Google's Pub/Sub because:

  1. The file above pulls in definitions from other files.
    • Eg, it is referencing "ourprotobuf/common/amount/v1/amount.proto", which is a shared protobuf message being referenced.
  2. The file has several messages -- Order, TaxesAndFees, etc. -- that are defined in a "hierarchical" format, but not all within other messages. It is clear to us, as humans, that Order is the "main" message that I would like for Pub/Sub to recognize, but I do not know how to make that clear to Pub/Sub. I want for Pub/Sub to know that, for instance, it should never be expecting a message of type SectionRow directly, but that SectionRow is a sub-message of Order.

So far, we are using this package to try to assist us in generating a proper schema for Pub/Sub. But it doesn't seem to be able to fully generate a single message that incorporates internal modules in the package, as far as I can manage.


But I feel like I'm making this more complicated than it needs to be, because:

  • I am able to work with the proper protobuf objects in the languages that we're using. Eg, I can create an Order object in Python that is completely correct, by importing the generated order_pb2.py file.
  • I feel there must a way to simply export a fully complete .proto file message from an object? In other words, isn't there something similar to:
> my_obj = order_pb2.Order()
> my_obj.exportSchema('./order.proto')

where the generated file order.proto would have the full schema of the object in a single message? In other words, the exportSchema method would do the reverse of what protoc does.

It seems this must be possible. The full structure of the object is known from the Python object.


Solution

  • The on-the-wire messages generated by a tweaked (see below) copy of your protobuf sources and the output from protoc-gen-pubsub-schema are identical for me using Go and a tweaked version of your Order message.

    Since the on-the-wire messages are identical, the schema, are equivalent.

    There is (at least) one issue: protoc-gen-pubsub-schema does not support optional fields (these were only recently added) to proto3.

    You may wish to raise this issue on the GitHub rep.

    Once this change has been made and after suitable testing, it appears you could use protoc-gen-pubsub-schema to programmatically generate schemas suitable for Cloud Pub/Sub from your existing schema.

    Proof:

    1. protoc-gen-pub-schema requires a single top-level message in a protobuf source. This is because it is a plugin for protoc and there's no way to specify a single message when compiling protos.

    Solution: create e.g. order.only.proto:

    syntax = "proto3";
    
    package ourprotobuf.order.v1;
    
    import "buf/validate/validate.proto";
    import "ourprotobuf/order/v1/order.proto";
    
    option java_multiple_files = true;
    option java_package = "com.us.ourprotobuf.order.v1";
    
    option go_package = "{module}/protos";
    
    message Order {
      string id = 1;
      string merchant_order_reference = 2 [(buf.validate.field).required = true];
      ourprotobuf.order.v1.TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
    }
    

    And revise order.proto to exclude Order:

    syntax = "proto3";
    
    package ourprotobuf.order.v1;
    
    import "buf/validate/validate.proto";
    import "ourprotobuf/common/amount/v1/amount.proto";
    
    option java_multiple_files = true;
    option java_package = "com.us.ourprotobuf.order.v1";
    
    option go_package = "{module}/protos";
    
    // message Order {
    //   optional string id = 1;
    //   optional string merchant_order_reference = 2 [(buf.validate.field).required = true];
    //   TaxesAndFees taxes_and_fees = 4 [(buf.validate.field).required = true];
    // }
    
    message TaxesAndFees {
      repeated Section sections = 1;
    }
    
    message Section {
      // NOTE: not required since it can be empty
      repeated SectionRow rows = 2;
      SectionTotalRow total_row = 3 [(buf.validate.field).required = true];
    }
    
    message SectionRow {
      optional string title = 1 [(buf.validate.field).required = true];
      ourprotobuf.common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
      optional string amount_descriptor = 3;
    }
    
    message SectionTotalRow {
      optional string title = 1 [(buf.validate.field).required = true];
      ourprotobuf.common.amount.v1.Amount amount = 2 [(buf.validate.field).required = true];
    }
    

    NOTE I'm using Go rather than Java and Go requires option go_package ...

    1. Compile these equivalent sources
    protoc \
    --proto_path=${PWD}/protos \
    --go_out=${PWD} \
    --go_opt=module=${MODULE} \
    ${PWD}/protos/ourprotobuf/order/v1/order.only.proto \
    ${PWD}/protos/ourprotobuf/order/v1/order.proto \
    ${PWD}/protos/ourprotobuf/common/amount/v1/amount.proto
    

    NOTE Absent your definition of ourprotobuf.common.amount.v1.Amount, I create a stub amount.proto

    1. Use the Go sources to generate a sample message:
    package main
    
    import (
        "fmt"
        "log/slog"
    
        "google.golang.org/protobuf/proto"
    
        pb "{module}/protos"
    )
    
    func pString(s string) *string {
        return &s
    }
    func main() {
        order := &pb.Order{
            Id:                     "ID",
            MerchantOrderReference: "MerchantOrderRef",
            TaxesAndFees: &pb.TaxesAndFees{
                Sections: []*pb.Section{
                    {
                        Rows: []*pb.SectionRow{{
                            Title: pString("Title"),
                            Amount: &pb.Amount{
                                Title: pString("Title"),
                            },
                        }},
                        TotalRow: &pb.SectionTotalRow{
                            Title: pString("Title"),
                            Amount: &pb.Amount{
                                Title: pString("Title"),
                            },
                        },
                    },
                },
            },
        }
    
        slog.Info("order", "order", order)
    
        b, err := proto.Marshal(order)
        if err != nil {
            slog.Error("unable to marshal order",
                "error", err,
            )
        }
    
        slog.Info("marshaled message",
            "order", fmt.Sprintf("%x", b),
        )
    }
    

    Yields:

    2023/12/04 15:19:26 INFO order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
    2023/12/04 15:19:26 INFO marshaled message order=0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65
    

    The on-the-wire message is 0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65

    1. Use e.g. Protobuf Decoder with this message.

    2. Generate Pub/Sub-equivalent (!) schema using protoc-gen-pubsub-schema

    --proto_path=${PWD}/protos \
    --pubsub-schema_out=${PWD}/pubsub \
    --pubsub-schema_opt=message-encoding-json \
    --pubsub-schema_opt=schema-syntax=proto3 \
    ${PWD}/protos/ourprotobuf/order/v1/order.only.proto
    

    NOTE To avoid naming conflicts, these Go sources are generated to pubsub folder (package) rather than protos.

    1. Compile the Pub/Sub-equivalent schema to Go source:
    # Need to include `option go_package` in generated proto
    sed \
    --in-place \
    --expression="s|package ourprotobuf.order.v1;|package ourprotobuf.order.v1;\n\noption go_package = \"${MODULE}/pubsub\";|g" \
    ${PWD}/pubsub/ourprotobuf/order/v1/order.only.pps
    
    # Compile pubsub/ourprotobuf/order/v1/order.only.pps
    # Generate Go sources in pubsub folder
    protoc \
    --proto_path=${PWD}/pubsub \
    --go_out=${PWD} \
    --go_opt=module=${MODULE} \
    ${PWD}/pubsub/ourprotobuf/order/v1/order.only.pps
    
    1. Generate message using this schema and Unmarshal the message generated using the tweaked original schema into this schema
    package main
    
    import (
        "fmt"
        "log/slog"
    
        "google.golang.org/protobuf/proto"
    
        pb "{module}/pubsub"
    )
    
    var (
        // The on-the-wire message generated using the tweaked original schema
        b = []byte("0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65")
    )
    
    func pString(s string) *string {
        return &s
    }
    func main() {
        order := &pb.Order{
            Id:                     "ID",
            MerchantOrderReference: "MerchantOrderRef",
            TaxesAndFees: &pb.Order_OurprotobufOrderV1TaxesAndFees{
                Sections: []*pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section{
                    {
                        Rows: []*pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionRow{{
                            Title: "Title",
                            Amount: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionRow_OurprotobufCommonAmountV1Amount{
                                Title: "Title",
                            },
                        }},
                        TotalRow: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionTotalRow{
                            Title: "Title",
                            Amount: &pb.Order_OurprotobufOrderV1TaxesAndFees_OurprotobufOrderV1Section_OurprotobufOrderV1SectionTotalRow_OurprotobufCommonAmountV1Amount{
                                Title: "Title",
                            },
                        },
                    },
                },
            },
        }
    
        slog.Info("order", "order", order)
    
        b, err := proto.Marshal(order)
        if err != nil {
            slog.Error("unable to marshal order",
                "error", err,
            )
        }
    
        slog.Info("marshaled message",
            "order", fmt.Sprintf("%x", b),
        )
    
        psOrder := &pb.Order{}
        if err := proto.Unmarshal(b, psOrder); err != nil {
            slog.Error("unable to unmarshal order into Pub/Sub message",
                "error", err,
            )
        }
    
        slog.Info("Pub/Sub order", "order", psOrder)
    
    }
    

    Yields:

    2023/12/04 15:26:09 INFO order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
    2023/12/04 15:26:09 INFO marshaled message order=0a02494412104d65726368616e744f7264657252656622260a2412100a055469746c6512070a055469746c651a100a055469746c6512070a055469746c65
    2023/12/04 15:26:09 INFO Pub/Sub order order="id:\"ID\" merchant_order_reference:\"MerchantOrderRef\" taxes_and_fees:{sections:{rows:{title:\"Title\" amount:{title:\"Title\"}} total_row:{title:\"Title\" amount:{title:\"Title\"}}}}"
    

    NOTE This code works; the Pub/Sub-compatible schema is able to unmarshal the message generated by the tweaked original schema; (redundantly but for completeness) the Pub/Sub-compatible schema is also able to generate the same on-the-wire message