Search code examples
c#.netgoprotocol-buffers

proto: cannot parse invalid wire-format data


I'm new to protobufs and currently writing a client which reads from a NATS server. The data being sent from the NATS server is a protobuf.

The client I'm writing is in Go. This is the .proto file I've written:

syntax = "proto3";

package execution;

option go_package = "./protos/execution";

enum OrderStatus {
  Working = 0;
  Rejected = 1;
  Cancelled = 2;
  Completed = 3;
}

enum OrderType {
  Limit = 0;
  Market = 1;
  StopLimit = 2;
  StopMarket = 3;
}

enum OrderSide {
  Buy = 0;
  Sell = 1;
}

enum RejectReason {
  NoRejection = 0;
  InstrumentNotFound = 1;
  OrderNotFound = 2;
  InvalidOrderType = 3;
  InvalidAccount = 4;
  InvalidSide = 5;
  InvalidAmount = 6;
  InvalidLimitPrice = 7;
  InvalidQuoteLimit = 8;
  InvalidActivationPrice = 9;
  InvalidTimeInForce = 10;
  MarketHalted = 11;
  MarketPaused = 12;
  NoCounterOrders = 13;
  MissingExpirationTime = 14;
  IncorrectExpirationTime = 15;
  InternalError = 16;
  IllegalStatusSwitch = 17;
  OrderAlreadyExists = 18;
  InstrumentNotReady = 19;
  ExternalSystemError = 20;
}

enum ReportCause {
  NONE = 0;
  NewOrder = 1;
  CancelOrder = 2;
  MassCancel = 3;
  Expiration = 4;
  Trigger = 5;
  MarketStatusChange = 6;
}

enum TimeInForce {
  GoodTillCancel = 0;
  ImmediateOrCancel = 1;
  FillOrKill = 2;
}

enum CancelReason {
  NotCancelled = 0;
  CancelledByTrader = 1;
  CancelledBySystem = 2;
  SelfMatchPrevention = 3;
  OrderTimeInForce = 4;
  Liquidation = 100;
}


message TradeData {
  int64 TradeId = 1;
  string Amount = 4;
  string ExecutionPrice = 5;
  OrderStatus OrderStatus = 7;
  int64 AccountId = 11;
  string MatchedOrderExternalId = 14;
  int64 MatchedOrderId = 16;
  string RemainingAmount = 17;
}

message Execution {
  string Origin = 4;
  OrderSide Side = 7;
  string RequestedPrice = 8;
  string RequestedAmount = 9;
  string RemainingAmount = 10;
  int64 ExecutedAt = 13;
  OrderStatus OrderStatus = 14;
  repeated TradeData Trades = 16;
  OrderType OrderType = 20;
  int64 Version = 22;
  int64 AccountId = 23;
  RejectReason RejectReason = 25;
  ReportCause ReportCause = 26;
  string InstructionId = 27;
  string ExternalOrderId = 28;
  int32 ExecutionEngineMarketId = 29;
  int64 OrderId = 30;
  CancelReason CancelReason = 31;
  int64 TxId = 32;
  TimeInForce TimeInForce = 34;
  string CancelledBy = 35;
}

The publishing server is written in C# and the code for their proto message is this:

[ProtoContract]
    public class ExecutionReport : IMarketResponse, IInstructionMessage, IOrderMatcherResponse
    {
        [ProtoIgnore]
        FeedMessageType IFeedMessage.Type => FeedMessageType.ExecutionReport;

        // ReSharper disable FieldCanBeMadeReadOnly.Global
        [ProtoMember(4)] public string Origin;
        [ProtoMember(7)] public OrderSide Side;
        [ProtoMember(8)] public decimal RequestedPrice;
        [ProtoMember(9)] public decimal RequestedAmount;
        [ProtoMember(10)] public decimal RemainingAmount;
        [ProtoMember(13)] public long ExecutedAt;
        [ProtoMember(14)] public OrderStatus OrderStatus;
        [ProtoMember(16)] public List<TradeData> Trades = new List<TradeData>();
        [ProtoMember(20)] public OrderType OrderType;
        [ProtoMember(22)] public long Version { get; set; }
        [ProtoMember(23)] public long AccountId;
        [ProtoMember(25)] public RejectReason RejectReason;
        [ProtoMember(26)] public ReportCause ReportCause;
        [ProtoMember(27)] public Guid InstructionId { get; set; }
        [ProtoMember(28)] public Guid ExternalOrderId;
        [ProtoMember(29)] public int ExecutionEngineMarketId { get; set; }
        [ProtoMember(30)] public long OrderId;
        [ProtoMember(31)] public CancelReason CancelReason;
        [ProtoMember(32)] public long TxId;
        [ProtoMember(34)] public TimeInForce TimeInForce;
        [ProtoMember(35)] public string CancelledBy;
    }

[ProtoContract]
    [StructLayout(LayoutKind.Sequential)]
    public struct TradeData
    {
        [ProtoMember(1)] public long TradeId;
        [ProtoMember(4)] public decimal Amount;
        [ProtoMember(5)] public decimal ExecutionPrice;
        [ProtoMember(7)] public OrderStatus OrderStatus;
        [ProtoMember(11)] public long AccountId;
        [ProtoMember(14)] public Guid MatchedOrderExternalId;
        [ProtoMember(16)] public long MatchedOrderId;
        [ProtoMember(17)] public decimal RemainingAmount;
    }

While trying to unmarshal the data I'm getting this error

proto: cannot parse invalid wire-format data

This is how I'm parsing the data:

_, err = sc.Subscribe("EXEC", func(m *stan.Msg) {
varr := &protos.Execution{}
err = proto.Unmarshal(m.Data, varr)
if err != nil {
    fmt.Printf("Err unmarshalling!: %v\n\n", err.Error())
} else {
    fmt.Printf("Received a message: %+v\n", varr)
}

A sample byte data I'm receiving from the server:

[5 85 0 0 0 56 1 66 3 8 144 78 74 2 8 1 82 2 8 1 104 197 192 132 194 159 143 219 237 8 176 1 25 184 1 11 208 1 1 218 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 226 1 18 9 133 66 138 247 239 67 93 77 17 176 192 189 75 170 203 186 145 232 1 1 240 1 25 128 2 25]

Adding more details:

This is how C# is sending data:

public async Task SendAsync(IFeedMessage msg)
{
    var subject = FeedSubject.ForMessage(msg);
    var data = msg.SerializeToArray();
    using (_metrics.FeedSendLatency.Start(new MetricTags("subject", subject.Value)))
    {
        await _connection.PublishAsync(subject, data);
    }
}

This is the structure of FeedMessage (which ExecutionReport is also inheriting indirectly)

public interface IFeedMessage
{
    FeedMessageType Type { get; }
    IFeedMessage Clone();
    void Reset();
}

This is how SerializeToArray() works:

public static ArraySegment<byte> SerializeToArray(this IFeedMessage message)
{
    return message.SerializeToMemory(new MemoryStream());
}

public static ArraySegment<byte> SerializeToMemory(this IFeedMessage message, MemoryStream stream)
{
    var start = stream.Position;
    message.Serialize(stream);
    return new ArraySegment<byte>(stream.GetBuffer(), (int)start, (int)(stream.Position - start));
}

public static void Serialize(this IFeedMessage message, Stream stream)
{
    stream.WriteByte((byte)message.Type);
    RuntimeTypeModel.Default.SerializeWithLengthPrefix(stream, message, message.GetType(), PrefixStyle.Fixed32, 0);
}

I'm not sure what is the exact reason. But it seems that the proto file that I've written is wrong. I went through few posts facing the same error but most don't address the same issue. Let me know if any other detail is needed.

Please help me with this.


Solution

  • Based on the discussion in the comments, I have managed to unmarshal the data.

    Notes:

    1. the data is prefixed with 5 bytes (this is totally unnecessary):
      • 1 byte for the message type
      • 4 bytes for the data length
    2. the C# implementation uses the decimal and Guid data types that are specific to C#. (as commented in bcl.proto, cross-platform code should usually avoid them completely).

    Here is the folder structure:

    ├── bcl.proto
    ├── execution.proto
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── protos
        ├── bcl.pb.go
        └── execution.pb.go
    

    bcl.proto:

    This file is copied from github.com/protobuf-net/protobuf-net. It's required because the .NET implementation use Decimal and Guid from this proto file.

    // The types in here indicate how protobuf-net represents certain types when using protobuf-net specific
    // library features. Note that it is not *required* to use any of these types, and cross-platform code
    // should usually avoid them completely (ideally starting from a .proto schema)
    
    // Some of these are ugly, sorry. The TimeSpan / DateTime dates here pre-date the introduction of Timestamp
    // and Duration, and the "well known" types should be preferred when possible. Guids are particularly
    // awkward - it turns out that there are multiple guid representations, and I accidentally used one that
    // I can only call... "crazy-endian". Just make sure you check the order!
    
    // It should not be necessary to use bcl.proto from code that uses protobuf-net
    
    syntax = "proto3";
    
    option csharp_namespace = "ProtoBuf.Bcl";
    option go_package = "./protos";
    
    package bcl;
    
    message TimeSpan {
      sint64 value = 1; // the size of the timespan (in units of the selected scale)
      TimeSpanScale scale = 2; // the scale of the timespan [default = DAYS]
      enum TimeSpanScale {
        DAYS = 0;
        HOURS = 1;
        MINUTES = 2;
        SECONDS = 3;
        MILLISECONDS = 4;
        TICKS = 5;
    
        MINMAX = 15; // dubious
      }
    }
    
    message DateTime {
      sint64 value = 1; // the offset (in units of the selected scale) from 1970/01/01
      TimeSpanScale scale = 2; // the scale of the timespan [default = DAYS]
      DateTimeKind kind = 3; // the kind of date/time being represented [default = UNSPECIFIED]
      enum TimeSpanScale {
        DAYS = 0;
        HOURS = 1;
        MINUTES = 2;
        SECONDS = 3;
        MILLISECONDS = 4;
        TICKS = 5;
    
        MINMAX = 15; // dubious
      }
      enum DateTimeKind
      {
         // The time represented is not specified as either local time or Coordinated Universal Time (UTC).
         UNSPECIFIED = 0;
         // The time represented is UTC.
         UTC = 1;
         // The time represented is local time.
         LOCAL = 2;
       }
    }
    
    message NetObjectProxy {
      int32 existingObjectKey = 1; // for a tracked object, the key of the **first** time this object was seen
      int32 newObjectKey = 2; // for a tracked object, a **new** key, the first time this object is seen
      int32 existingTypeKey = 3; // for dynamic typing, the key of the **first** time this type was seen
      int32 newTypeKey = 4; // for dynamic typing, a **new** key, the first time this type is seen
      string typeName = 8; // for dynamic typing, the name of the type (only present along with newTypeKey)
      bytes payload = 10; // the new string/value (only present along with newObjectKey)
    }
    
    message Guid {
      fixed64 lo = 1; // the first 8 bytes of the guid (note:crazy-endian)
      fixed64 hi = 2; // the second 8 bytes of the guid (note:crazy-endian)
    }
    
    message Decimal {
      uint64 lo = 1; // the first 64 bits of the underlying value
      uint32 hi = 2; // the last 32 bis of the underlying value
      uint32 signScale = 3; // the number of decimal digits (bits 1-16), and the sign (bit 0)
    }
    

    execution.proto

    syntax = "proto3";
    
    package execution;
    
    option go_package = "./protos";
    
    import "bcl.proto";
    
    enum OrderStatus {
      Working = 0;
      Rejected = 1;
      Cancelled = 2;
      Completed = 3;
    }
    
    enum OrderType {
      Limit = 0;
      Market = 1;
      StopLimit = 2;
      StopMarket = 3;
    }
    
    enum OrderSide {
      Buy = 0;
      Sell = 1;
    }
    
    enum RejectReason {
      NoRejection = 0;
      InstrumentNotFound = 1;
      OrderNotFound = 2;
      InvalidOrderType = 3;
      InvalidAccount = 4;
      InvalidSide = 5;
      InvalidAmount = 6;
      InvalidLimitPrice = 7;
      InvalidQuoteLimit = 8;
      InvalidActivationPrice = 9;
      InvalidTimeInForce = 10;
      MarketHalted = 11;
      MarketPaused = 12;
      NoCounterOrders = 13;
      MissingExpirationTime = 14;
      IncorrectExpirationTime = 15;
      InternalError = 16;
      IllegalStatusSwitch = 17;
      OrderAlreadyExists = 18;
      InstrumentNotReady = 19;
      ExternalSystemError = 20;
    }
    
    enum ReportCause {
      NONE = 0;
      NewOrder = 1;
      CancelOrder = 2;
      MassCancel = 3;
      Expiration = 4;
      Trigger = 5;
      MarketStatusChange = 6;
    }
    
    enum TimeInForce {
      GoodTillCancel = 0;
      ImmediateOrCancel = 1;
      FillOrKill = 2;
    }
    
    enum CancelReason {
      NotCancelled = 0;
      CancelledByTrader = 1;
      CancelledBySystem = 2;
      SelfMatchPrevention = 3;
      OrderTimeInForce = 4;
      Liquidation = 100;
    }
    
    
    message TradeData {
      int64 TradeId = 1;
      bcl.Decimal Amount = 4;
      bcl.Decimal ExecutionPrice = 5;
      OrderStatus OrderStatus = 7;
      int64 AccountId = 11;
      bcl.Guid MatchedOrderExternalId = 14;
      int64 MatchedOrderId = 16;
      bcl.Decimal RemainingAmount = 17;
    }
    
    message Execution {
      bytes Origin = 4;
      OrderSide Side = 7;
      bcl.Decimal RequestedPrice = 8;
      bcl.Decimal RequestedAmount = 9;
      bcl.Decimal RemainingAmount = 10;
      int64 ExecutedAt = 13;
      OrderStatus OrderStatus = 14;
      repeated TradeData Trades = 16;
      OrderType OrderType = 20;
      int64 Version = 22;
      int64 AccountId = 23;
      RejectReason RejectReason = 25;
      ReportCause ReportCause = 26;
      bcl.Guid InstructionId = 27;
      bcl.Guid ExternalOrderId = 28;
      int32 ExecutionEngineMarketId = 29;
      int64 OrderId = 30;
      CancelReason CancelReason = 31;
      int64 TxId = 32;
      TimeInForce TimeInForce = 34;
      string CancelledBy = 35;
    }
    

    protos/

    The files in this folder are generated from the proto files with this command:

    protoc --go_out=protos --go_opt=paths=source_relative bcl.proto execution.proto
    

    go.mod

    module mymodule.local
    
    go 1.20
    
    require google.golang.org/protobuf v1.30.0
    

    main.go

    package main
    
    import (
        "encoding/binary"
        "log"
    
        "google.golang.org/protobuf/proto"
    
        "mymodule.local/protos"
    )
    
    func main() {
        data := []byte{5, 85, 0, 0, 0, 56, 1, 66, 3, 8, 144, 78, 74, 2, 8, 1, 82, 2, 8, 1, 104, 197, 192, 132, 194, 159, 143, 219, 237, 8, 176, 1, 25, 184, 1, 11, 208, 1, 1, 218, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 226, 1, 18, 9, 133, 66, 138, 247, 239, 67, 93, 77, 17, 176, 192, 189, 75, 170, 203, 186, 145, 232, 1, 1, 240, 1, 25, 128, 2, 25}
        if len(data) < 5 {
            log.Fatal("data should contain at least 5 bytes")
        }
        messageType := data[0]
        length := binary.LittleEndian.Uint32(data[1:5])
        data = data[5:]
        if length != uint32(len(data)) {
            log.Fatalf("invalid data length: %d", length)
        }
        execution := &protos.Execution{}
    
        err := proto.Unmarshal(data, execution)
        if err != nil {
            log.Fatalf("Err unmarshalling!: %v", err)
        }
    
        log.Printf("message type: %d, message: %+v", messageType, execution)
    }
    

    Output of the data provided in the quetion:

    2023/06/15 17:50:58 message type: 5, message: Side:Sell  RequestedPrice:{lo:10000}  RequestedAmount:{lo:1}  RemainingAmount:{lo:1}  ExecutedAt:638223043314917445  Version:25  AccountId:11  ReportCause:NewOrder  InstructionId:{lo:5574686611683820165  hi:10500929413443338416}  ExternalOrderId:{lo:5574686611683820165  hi:10500929413443338416}  ExecutionEngineMarketId:1  OrderId:25  TxId:25