Search code examples
deserializationapache-flinkkafka-consumer-apiflink-streaming

Apache Flink: Read data from Kafka as byte array


How can I read data from Kafka in byte[] format?

I have an implementation that reads events as String with SimpleStringSchema() but I couldn't find a schema to read data as byte[].

Here is my code:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kafka1:9092");
    properties.setProperty("zookeeper.connect", "zookeeper1:2181");
    properties.setProperty("group.id", "test");
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    properties.setProperty("auto.offset.reset", "earliest");
    DataStream<byte[]> stream = env
                .addSource(new FlinkKafkaConsumer010<byte[]>("testStr", ? ,properties));

Solution

  • Finally I found that:

    DataStream<byte[]> stream = env
                .addSource(new FlinkKafkaConsumer010<>("testStr", new AbstractDeserializationSchema<byte[]>() {
                    @Override
                    public byte[] deserialize(byte[] bytes) throws IOException {
                        return bytes;
                    }
                }, properties));