I've configured a EventBridge Pipe to consume events from a self-managed Kafka and then forward them to a Step Function.
The topic we are consuming from has Protobuf messages, so the events in the Pipe look like this:
[
{
"topic": "<REDUCTED>",
"partition": 0,
"offset": 226,
"timestamp": 1694180898000,
"timestampType": "CREATE_TIME",
"key": "ABCDVE1BTi0yMDIzLTA5LTA3",
"value": "AAAAFVwAChJ .......... pZGVudCBub24gAoABAQ==",
"headers": [],
"bootstrapServers": "smk://<REDUCTED>",
"eventSource": "SelfManagedKafka",
"eventSourceKey": "<REDUCTED>"
}
]
I am trying to add an enrichment step (and call a Lambda function) in the pipe in order to convert the Protobuf message into JSON and allow the Step Function to edit it further. So the Pipe looks like this:
In the lambda (built with Quarkus) I'm trying to convert the String value to the Protobuf model but I'm unable to. So far I've tried:
parseFrom
with ByteBuffer
:
ByteBuffer wrap = ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8));
return MyEventProto.MyEvent.parseFrom(wrap);
// Error: com.google.protobuf.InvalidProtocolBufferException:
// Protocol message end-group tag did not match expected tag.
parseDelimitedFrom
with bytes from value:
ByteArrayInputStream inputStream = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
return MyEventProto.MyEvent.parseDelimitedFrom(inputStream);
// Error: com.google.protobuf.InvalidProtocolBufferException:
// While parsing a protocol message, the input ended unexpectedly in the middle of a field.
// This could mean either that the input has been truncated or that an embedded message misreported
// its own length.
parseDelimitedFrom
/parseFrom
with Base64 decode of value InputSteam
:
byte[] decode = Base64.getDecoder().decode(value.getBytes(StandardCharsets.UTF_8));
return MyEventProto.MyEvent.parseDelimitedFrom(new ByteArrayInputStream(decode));
// or return MyEventProto.MyEvent.parseFrom(new ByteArrayInputStream(decode));
// No error but it's returning an empty object back,
// similar to what MyEventProto.MyEvent.getDefaultInstance() returns
parseFrom
with Base64 decode of value in raw bytes:
byte[] decode = Base64.getDecoder().decode(value.getBytes(StandardCharsets.UTF_8));
return MyEventProto.MyEvent.parseFrom(decode);
// Error: com.google.protobuf.InvalidProtocolBufferException:
// Protocol message contained an invalid tag (zero).
Is there a way to parse the protobuf message?
In the docs I figured that AWS passes in the Base64 encoded message. So I used this approach:
byte[] bytes = Base64.getDecoder().decode(value.getBytes());
ByteBuffer wrap = ByteBuffer.wrap(bytes);
return MyEventProto.MyEvent.parseFrom(wrap);
But then started using https://protobuf-decoder.netlify.app to see if I can decode the message and started removing bytes from the start. So removing the first few bytes (AAAAFVwA
) and the padding (==
) worked! Still can't figure out what AWS adds in the beginning of that message and renders it impossible to parse directly.
It turns out that Kafka adds some extra bytes in the message, probably to allow the deserializers to parse the message without issues. In my edit above, manually removing that extra bytes allowed me to parse the message without using a deserializer.
For a proper solution, first construct a io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
:
String schemaRegistryUrl = "http://schema.registry:port";
Map<String, Object> protoDeserialiserProperties = Map.of(
KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl,
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Message.class,
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE, String.class
);
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 10, List.of(new ProtobufSchemaProvider()), Collections.emptyMap());
KafkaProtobufDeserializer<Message> deserializer = new KafkaProtobufDeserializer<>(registryClient, protoDeserialiserProperties);
using:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>${latest.version}</version>
</dependency>
And then parse the message:
byte[] bytes = Base64.getDecoder().decode(value.getBytes());
Message deserialized = deserializer.deserialize("my-topic", bytes);
MyEventProto.MyEvent offersByEvent = MyEventProto.MyEvent.parseFrom(deserialized.toByteString());