Search code examples
scalaapache-flinkapache-pulsarprotocol-buffers

How should I define Flink's Schema to read Protocol Buffer data from Pulsar


I am using Pulsar-Flink to read data from Pulsar in Flink. I am having difficulty when the data's format is Protocol Buffer.

In the GitHub top page, Pulsar-Flink is using SimpleStringSchema. However, seemingly it does not comply with Protocol Buffer officially. Does anyone know how to deal with the data format? How should I define the schema?

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic")
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);

DataStream<String> stream = see.addSource(source);

// chain operations on dataStream of String and sink the output
// end method chaining

see.execute();

FYI, I am writing Scala code, so if your explanation is for Scala(not for Java), it is really helpful. Surely, any kind of advice is welcome!! Including Java.


Solution

  • You should implement your own DeserializationSchema. Let's assume that you have a protobuf message Address and have generated the respective Java class. Then the schema should look like the following:

    public class ProtoDeserializer implements DeserializationSchema<Address> {
        @Override
        public TypeInformation<Address> getProducedType() {
            return TypeInformation.of(Address.class);
        }
    
        @Override
        public Address deserialize(byte[] message) throws IOException {
            return Address.parseFrom(message);
        }
    
        @Override
        public boolean isEndOfStream(Address nextElement) {
            return false;
        }
    }