Search code examples
javaspringapache-kafkakafka-consumer-apispring-kafka

Read from configuration kafkalistener


I have a spring boot application which has the listener as follows:

@KafkaListener(id = "demo", topics = "demo",
            containerFactory = "retryKafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) throws Exception {

}

I have apache configuration object which I want to use to read topics from properties. I know that I can use property placeholders to do that. But the configuration I use has some logic inside so want to read only from that configuration object. It is as follows:

@Inject
private Configuration configuration

I can get topics as configuration.getString("kafka.consumer.topic"). I tried using like this: topics = "#{configuration.getString('kafka-generic.consumer.topics')}" in topics field of KafkaListener annotation but getting the following errors.

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'configuration' cannot be found on object of type 'org.springframework.beans.factory.config.BeanExpressionContext' - maybe not public?
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:224)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:51)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:242)
    at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:161)
    ... 23 common frames omitted

Can someone tell me how to use configuration.getString("kafka.consumer.topic") in topics field of KafkaListener annotation?


Solution

  • getting errors.

    That is never enough for for a question like this; you have to show the actual error.

    You can use SpEL topics = "#{@somebean.someProperty}" ortopics = "#{@somebean.getString('...')}".