Search code examples
spring-bootkafka-consumer-apispring-kafka

Kafka Consumer not receiving any message from Topic even though group id and client id is set correctly


Using @InputChannelAdaptor , I am polling messages from topic but not receiving any messages if I post json from commandline. but if I pass any text it throws exception. I am trying to consume json object ({"name" : "foo"}) and convert it to CreateResponse class

Command to send message to Topic:

C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw

Code:

package com.cae.egca.connector.config;



@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {



    @Bean
    RecordMessageConverter messageConverter() {
        return new StringJsonMessageConverter();
    }


    @Bean
    @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
    public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
                                                         RecordMessageConverter messageConverter) {

        KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                new ConsumerProperties("myTopic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
        kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
        kafkaMessageSource.setMessageConverter(messageConverter);
        kafkaMessageSource.setPayloadType(CreateResponse.class);

        return kafkaMessageSource;
    }


    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);


        return new DefaultKafkaConsumerFactory(props);
    }

 @ServiceActivator(inputChannel = "inputChannel")
    void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
        log.info("In SERVICE ACTIVATOR ");
        MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
        fileServices.processFile(cr);
        acknowledgment.acknowledge();
        log.info("ACKNOWLEDGED: ");
    }
    @Bean
    QueueChannel inputChannel() {
        return new QueueChannel();
    }

}

Exception:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    ... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"sdwadaw"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
    at  ... 21 more```




Solution

  • This works fine for me...

    @SpringBootApplication
    public class So76876015Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So76876015Application.class, args);
        }
    
        @Bean
        RecordMessageConverter messageConverter() {
            return new StringJsonMessageConverter();
        }
    
        @Bean
        @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
        KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
                RecordMessageConverter messageConverter) {
    
            KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                    new ConsumerProperties("myTopic"));
            kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
            kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
            kafkaMessageSource.setMessageConverter(messageConverter);
            kafkaMessageSource.setPayloadType(CreateResponse.class);
    
            return kafkaMessageSource;
        }
    
        @ServiceActivator(inputChannel = "inputChannel")
        void consumeIt(CreateResponse cr) {
            System.out.println(cr);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
        }
    
        @Bean
        ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("myTopic", "{\"name\":\"foo\"}");
            };
        }
    
        public static class CreateResponse {
    
            private String name;
    
            protected String getName() {
                return this.name;
            }
    
            protected void setName(String name) {
                this.name = name;
            }
    
            @Override
            public String toString() {
                return "CreateResponse [name=" + this.name + "]";
            }
    
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    
    CreateResponse [name=foo]