Search code examples
springspring-bootspring-kafkaspring-elproperty-placeholder

Spring Expression Language issue


I have the following class. I have verified in the console, the constructor of this class is called(during bean creation) before resolving the topic placeholder value in Kafka listener:

public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements 
MessageReceiver<MSG> {

@SuppressWarnings("unused")
private String topic;

public MsgReceiver(String topic, MessageHandler<MSG> handler) {
    super(handler);
    this.topic = topic;
}

@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
    System.out.println("Received "+payload);
    super.receiveMessage(headers, payload);
}

}

I have my application.yml as follows:

my:
  messenger:
    kafka:
      address: localhost:9092
      topics:
        topic_1:
          value: my_topic
          groupId: 1

During bean creation, I pass "topic_1" which I want should dynamically be used inside Kafka listener topic placeholder. I tried as shown in the code itself, but it does not work. Please suggest how to do that.


Solution

  • Placeholders are resolved before SpEL is evaluated; you can't dynamically build a placeholder name using SpEL. Also, you can't reference fields like that; you have to do it indirectly via the bean name (and a public getter).

    So, to do what you want, you have to add a getter and get the property dynamically from the environment after building the property name with SpEL.

    There is a special token __listener which allows you to reference the current bean.

    Putting it all together...

    @SpringBootApplication
    public class So63056065Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So63056065Application.class, args);
        }
    
        @Bean
        public MyReceiver receiver() {
            return new MyReceiver("topic_1");
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
        }
    }
    
    class MyReceiver {
    
        private final String topic;
    
        public MyReceiver(String topic) {
            this.topic = topic;
        }
    
        public String getTopic() {
            return this.topic;
        }
    
        @KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
                groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    Result...

    2020-07-23 12:13:44.932  INFO 39561 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = 
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = 1
        group.instance.id = null
    ...
    

    and

    1: partitions assigned: [my_topic-0]