Search code examples
javaapache-kafkaapache-kafka-streamsspring-cloud-stream-binder-kafka

Access record's partition numebr in kafka streams


I am using Kafka 2.6 with spring cloud stream kafka-streams binder. I want to access record headers, partition no etc in my Kafka streams application. I read about using Processor API, using ProcessorContext etc. But everytime ProcessorContext object is coming null.

Below is the code

@StreamListener(Bindings.input)
@SendTo(Bindings.output)
public KStream<String, String> process(KStream<String, String> input)
{
    return input.transform(new TransformerSupplier<String, String, KeyValue<String, String>>()
    {
        public Transformer<String, String, KeyValue<String, String>> get() 
        {
            return new Transformer<String, String, KeyValue<String, String>>() 
            {

                private int total = 0;
                ProcessorContext context;

                @Override
                public void close() {
                }

                @Override
                public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
                {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String k, String v)
                {
                    System.out.println("ProcessorContext: "+this.context);
                    System.out.println("value: "+v);
                    return new KeyValue<>(k, v);
                }
            };
        }
    });
}

In this code ProcessorContext is always printed as null. I also tried using ListenerContainerCustomizer for spring-boot. But that also is not working

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
    return (container, dest, group) ->
    {
        container.setRecordInterceptor(record ->
        {
            System.out.println(">>>> Received record, checking headers");
            Headers headers = record.headers();
            System.out.println(">>>> Header length: " + headers.toArray().length);
            for (Header header : headers)
            {
                if (header.key().equalsIgnoreCase("eventtype"))
                {
                    String value = String.valueOf(header.value());
                    if (!value.equalsIgnoreCase("PUBLISHED"))
                    {
                        System.out.println("Event type from header not PUBLISHED, skipping record");
                        return null;
                    }
                }
            }
            System.out.println("Processing record");
            return record;
        });
    };
}

I printed list of beans registered with beans I could see above one. But it never works. I anyways require first approach to work since I like to run some business logic with partition number.

Please help badly stuck since many days.


Solution

  • Please change

    @Override
     public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
     {
                        this.context = context;
    }
    

    To

    @Override
     public void init(org.apache.kafka.streams.processor.ProcessorContext pc)
     {
                        this.context = pc;
    }
    

    this.context = context // both are same, looks like a typo.