Search code examples
javaapache-kafkakryo

Deserializing Java objects from Kafka consumer


I have a Kafka Consumer, currently configured with:

kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

But what I really want is to be able to use a Kryo Deserializer instead:

public class KryoPOJODeserializer<T> implements Deserializer<T> {

    private Kryo kryo = new Kryo();

    @Override
    public void configure(Map props, boolean isKey) {
        kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        // Deserialize the serialized object.
        return kryo.readObject(new Input(data), T.class);
    }

    @Override
    public void close() {

    }

}

What I can't figure out is, is it possible to reuse the same Consumer for different topics (each topic has a different type of POJO on it)? If my consumer config is:

kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());

Or, do I have to have a separate Consumer for every topic?

Alternatively, do I have to remove the generics part of my Deserializer, always return an Object, and cast the Object into the relevant POJO in the client code? Something like:

public class KryoPOJODeserializer implements Deserializer {

    private Kryo kryo = new Kryo();

    @Override
    public void configure(Map props, boolean isKey) {
        kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
        kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() );
    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        // Deserialize the serialized object.
        return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
    }

    @Override
    public void close() {

    }

}

The latter would work but it feels a bit dirty.

Any suggestions appreciated!


Solution

  • You can use you original approach by passing Deserializer instances directly into the Consumer:

    KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties,
        new StringDeserializer(), new KryoPOJODeserializer(Foo.class));
    

    If you want to reuse the same incoming data type for a number of topics then you can set up a subscription to those topics using a single consumer. If you want different object types for the values, then you will need to use multiple consumers.

    Otherwise your second approach is also valid.