Search code examples
thriftproducer-consumerapache-kafka

configure kafka to send custom type data


I am pretty new to kafka

I have created a producer and consumer greoup using official examples although I want to send thrift bundles from producer and the consumer to get the bundles and store in a bundle array.

I have written the producer side code as

KeyedMessage<String, Bundle> data = new KeyedMessage<String, Bundle>("bundles", "Bundle", bundle); 
        producer.send(data);

But on the consumer side I have

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(NO_OF_THREADS));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> messageStreams = consumerMap.get(topic);

Can I make kafka consumer to get data as bundle type rather than byte[] array.


Solution

  • You can use the following method to decode values directly into Bundle type:

    public interface kafka.javaapi.consumer.ConsumerConnector {
      ...
      public <K,V> Map<String, List<KafkaStream<K,V>>> 
         createMessageStreams(
             Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
    

    In this case you need to implement your own valueDecoder of type Decoder<Bundle> using corresponding Thrift API.

    See High Level Consumer API description in Kafka docs.