Search code examples
apache-kafkasnappyapache-flink

flink: getting byte[] data from kafka


Im using flink-1.0-SNAPSHOT to consume data from kafka. The data is coming in as Snappy compressed byte[] that gets passed to thrift for later use.

When I use flink to retrieve the data it's getting corrupted or mishandled somehow such that it can't be decompressed. Code is derived from this sample and is as follows:

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<String, String>() {

    @Override public String map(String value) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
 });

The isValidCompressedBuffer returns false every time.

The data is known to be good when consumed via other avenues.

What did I miss?


Solution:

Im posting this as I couldn't find any examples that used RawSchema.

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));

    dataStream.map(new MapFunction<byte[], Object>() {
        @Override
        public Object map(byte[] bytes) throws Exception {
            boolean bvali = Snappy.isValidCompressedBuffer(bytes);

            });
            return 0;
        }
    }).print();
    env.execute();
}

Solution

  • Reading byte-messages as String is incorrect. You should read bytes as is and then decompress:

    public Object map(byte[] bytes) throws Exception {
        boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
        ...