I have below kafka stream code
public class KafkaStreamHandler implements Processor<String, String>{
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
// TODO Auto-generated method stub
this.context = context;
}
public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {
Headers contexts = context.headers();
contexts.forEach(header -> System.out.println(header));
}
public void StartFailstreamHandler() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "500");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//consumer_timeout_ms
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
props.put("state.dir","/tmp/kafka/stat));
userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
/* take few descsion based on Header */
/* How to get the Header */
userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
kafkaStreams.cleanUp();
kafkaStreams.start();
}
}
And now our one of the client is sending version info on kafka headers like below.
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);
Based on this header I need to choose parser for my message, How to read this header using KStream operator? I have seen all the API of stream but no method is giving header
I can not change to normal kakfa consumer as my application is already dependent on few KStream APIs ..
Processor does not let you chaining new operator in downstream DSL, you should use a transformValues so use can continue to use Stream DSL:
public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public String transform(String readOnlyKey, String value) {
Headers headers = context.headers();
/* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
/* How to get the Header */
return value;
}
@Override
public void close() {
}
}
userStream
.transformValues(ExtractHeaderThenDoSomethingTransformer::new)
.map(this::processs);