I am using Kafka 2.6 with spring cloud stream kafka-streams binder. I want to access record headers, partition no etc in my Kafka streams application. I read about using Processor API, using ProcessorContext etc. But everytime ProcessorContext object is coming null.
Below is the code
@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
{
public Transformer<String, String, KeyValue<String, String>> get()
{
return new Transformer<String, String, KeyValue<String, String>>()
{
private int total = 0;
ProcessorContext context;
@Override
public void close() {
}
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
@Override
public KeyValue<String, String> transform(String k, String v)
{
System.out.println("ProcessorContext: "+this.context);
System.out.println("value: "+v);
return new KeyValue<>(k, v);
}
};
}
});
}
In this code ProcessorContext is always printed as null. I also tried using ListenerContainerCustomizer for spring-boot. But that also is not working
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) ->
{
container.setRecordInterceptor(record ->
{
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: " + headers.toArray().length);
for (Header header : headers)
{
if (header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if (!value.equalsIgnoreCase("PUBLISHED"))
{
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
I printed list of beans registered with beans I could see above one. But it never works. I anyways require first approach to work since I like to run some business logic with partition number.
Please help badly stuck since many days.
Please change
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = context;
}
To
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
{
this.context = pc;
}
this.context = context // both are same, looks like a typo.