I am pretty new to kafka
I have created a producer and consumer greoup using official examples although I want to send thrift bundles from producer and the consumer to get the bundles and store in a bundle array.
I have written the producer side code as
KeyedMessage<String, Bundle> data = new KeyedMessage<String, Bundle>("bundles", "Bundle", bundle);
producer.send(data);
But on the consumer side I have
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(NO_OF_THREADS));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> messageStreams = consumerMap.get(topic);
Can I make kafka consumer to get data as bundle type rather than byte[] array.
You can use the following method to decode values directly into Bundle type:
public interface kafka.javaapi.consumer.ConsumerConnector {
...
public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(
Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
In this case you need to implement your own valueDecoder
of type Decoder<Bundle>
using corresponding Thrift API.
See High Level Consumer API description in Kafka docs.