Search code examples
apache-kafkaapache-beamapache-beam-ioprotobuf-javaapache-beam-kafkaio

How to consume Kafka messages with a protobuf definition in Apache Beam?


I'm using KafkaIO unbounded source in a Apache Beam pipeline running on DataFlow. Following configuration works for me

Map<String, Object> kafkaConsumerConfig  = new HashMap<String, Object>() {{
  put("auto.offset.reset", "earliest");
  put("group.id", "my.group.id");
}};

p.apply(KafkaIO.<String, String>read()
        .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withMaxNumRecords(10)
        .withoutMetadata())
// do something

Now as I have a protobuf definition for the messages in my topic I would like to use it to convert the kafka records in Java objects.

Following configuration doesn't work and requires a Coder:

 p.apply(KafkaIO.<String, Bytes>read()
        .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(BytesDeserializer.class)
        .withMaxNumRecords(10)
        .withoutMetadata())

Unfortunately, I cannot find out what is the right Value Deserializer + Coder combination and cannot find similar examples in the documentation. Do you have any working examples for using Protobuf with Kafka source in Apache Beam?


Solution

  • If you want to deserialise protobuf messages, you need a custom class which implements the Deserializer interface from the apache kafka library. This deserialisation is the same for other models too, not only apache beam.

    To get the apache kafka library you can add org.apache.kafka:kafka-clients as a dependency.

    Then create the custom deserializer class, which would look something like:

    public class ProtoDeserializer implements Deserializer<YourProtoClass> {
        @Override
        public YourProtoClass deserialize(String topic, byte[] data) {
            if (data == null) return null;
    
            try {
                return YourProtoClass.parseFrom(data);
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(e);
            }
        }
    }
    

    Then use that class in you pipeline:

     p.apply(KafkaIO.<String, Bytes>read()
            .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
            .withConsumerConfigUpdates(kafkaConsumerConfig)
            .withTopic("my.topic")
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(ProtoDeserializer.class)
            .withMaxNumRecords(10)
            .withoutMetadata())