Search code examples
spring-bootapache-kafkaspring-kafka

Not able to consume the message from Kafka topic through SpringBoot but the same messgae is being consumed by console consumer


I have a Simple SpringBoot application having both the Producer and the Consumer as Service. They exist in the same application.

My Controller is getting the message but my consumer layer of the application is not getting it. So from the Controller I am feeding message to the Producer but it is not getting accessed by the Consumer. The message not getting consumed by the Consumer is the problem.

Here is the eclipse ide logger message, I get when I run the springboot application and the controller receives the first message,

2021-02-23 22:33:50.637  INFO 13792 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
2021-02-23 22:33:50.898  INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController       : writeStringMessageToTopic called.
2021-02-23 22:33:50.899  INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController       : recieved string message = '"22:26 Hello World"'
2021-02-23 22:33:50.953  INFO 13792 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = true
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-02-23 22:33:51.200  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-02-23 22:33:51.201  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-02-23 22:33:51.201  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1614099831197
2021-02-23 22:33:51.645  INFO 13792 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ

After the last message above the log message from my KafkaConsumer class for consuming the mesage is never printed. Seems like the Consumer is not even being trigerred. Why is this so? Am I doing any mistake in configuration.

Here is the my KafkaConsumerConfig.java class

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("bar");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("headers");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("partitions");
    }

   
    public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(entryObjectConsumerFactory());
        return factory;
    }
}

And here is my KafkaConsumer.java class

public class KafkaConsumer {
    
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    @KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
    public void receive(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", message));
    }

    @KafkaListener(topics = "myTopic", containerFactory = "greetingKafkaListenerContainerFactory")
    public void receive(EntryObject entryObject) throws IOException {
        logger.info("received entryObject = '{}'", entryObject.toString());
    }
}

This is my application.properties file

kafka.bootstrapAddress=localhost:9092
message.topic.name=myTopic

Kafka is running that has been verified in my machine, the same topic can be accessed in own local terminal. But with SpringBoot consumer I am not able to access it. Any mistake I am doing in my code.


Solution

  • KafkaListener needs to be declared as a @Bean, annotated with @Component or @Service(if its in the service layer, to be more specific).