Search code examples
javaspringspring-bootapache-kafkaconnection

In Java, How to close Kafka connection manually?


My code is in java + Spring Boot

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void produce(String message) {
    logger.info("Producer : Kafka Topic -> {}, Kafka Message -> {}", TOPIC, message);
    kafkaTemplate.send(TOPIC, message);
}

@KafkaListener(topics = TOPIC, groupId = GROUP_ID)
public void consume(String message) {
    System.out.println("Kafka consume value ->" + message);
    logger.info("Consumer : Kafka Message -> {}", message);
    try {
        setKafkaStatus(Integer.parseInt(message.trim()));
    }catch (Exception e) {
        logger.info("Kafka message is not Integer");
        setKafkaStatus(0);
    }
}

public void closeConnection() {
    //code for close connection

}

Solution

  • @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    public void closeConnection() {
        this.registry.stop();
    }