I am trying to read data from kafka as you can see :
var config = new ConsumerConfig
{
BootstrapServers = ""*******,
GroupId = Guid.NewGuid().ToString(),
AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{
consumer.Subscribe("AdminIpoChange");
while (true)
{
AdminIpoChange item = new AdminIpoChange();
var cr = consumer.Consume();
item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
}
consumer.Close();
}
I am using google protobuf
for send and receive data .This code returns this error in parser line:
KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 213
at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
at AdminIpoChange.MergeFrom(CodedInputStream input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 188
at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:\MofidProject\infrastructure\Queue\Kafka\KafkaConsumer.cs:line 168
D:\MofidProject\mts.consumer.plus\bin\Debug\net6.0\mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'
Updated:
My sample data that comes from Kafka :
- {"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
My rlc Model :
syntax = "proto3";
message AdminIpoChange
{
string Id =1;
string SymbolName =2;
string SymbolIsin =3;
string Date =4;
string Time=5;
double MinPrice =6;
double MaxPrice =7;
int32 Share =8;
bool Show =9;
int32 Operation =10;
string CreateDateTime=11;
enum AdminIpoOperation
{
Add = 0;
Edit = 1;
Delete = 2;
}
}
My data in bytes :
7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22
7D
The data is definitely not protobuf binary; byte 0 starts a group with field number 15; inside this group is:
after this (at byte 151), an end-group token is encountered with field number 6
There are many striking things about this:
So: there's no doubt about it: that data is ... not what you expect. It certainly isn't valid protobuf binary. Without knowing how that data is stored, all we can do is guess, but on a hunch: let's try decoding it as UTF8 and see what it looks like:
{"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
or (formatted)
{
"SymbolName":"\u0641\u062F\u0631",
"SymbolIsin":"IRo3pzAZ0002",
"Date":"1400/12/15",
"Time":"08:00-12:00",
"MinPrice":17726,
"MaxPrice":21666,
"Share":1000,
"Show":false,
"Operation":0,
"Id":"100d8e0b54154e9d902054bff193e875",
"CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"
}
Oops! You've written the data as JSON, and you're trying to decode it as binary protobuf. Decode it as JSON instead, and you should be fine. If this was written with the protobuf JSON API: decode it with the protobuf JSON API.