Search code examples
javaapache-kafkaspring-kafka

how to manual acknowledge kafka message without the use of @kafkalistener


I am using the following code to create my kafka consumer container:

public void newContainer(String topic, int partition, int idlePeriod) {
        this.factory.setConsumerFactory(consumerFactory);
        this.factory.getContainerProperties().setIdleEventInterval(idlePeriod * 1000L);
        this.factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ConcurrentMessageListenerContainer<String, String> container = this.factory
                .createContainer(new TopicPartitionOffset(topic, partition));
        String groupId = UUID.randomUUID().toString();
        container.getContainerProperties().setGroupId(groupId);
        container.setupMessageListener((MessageListener<String, String> ) record -> {
            kafkaService.proccessorConsumer(record);
           
        });
        this.containers.put(groupId, container);
        container.start();
    }

how could I perform manual acknowledgment , when I was using the @KafkaListener I was able to do so since the ack object is in the header but now I need to do it using the code above...any help please. below if the full class code :

@Component
public class DynamicListener implements ConsumerSeekAware {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Autowired
    private ProvisionConfig provisionConfig;

    @Autowired
    private KafkaService kafkaService;

    private int oldIdlePeriod;

    private ConcurrentMessageListenerContainer<String, String> container;

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers = new ConcurrentHashMap<>();

    DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
       
    }

    public ConcurrentMessageListenerContainer<String, String> newContainer(String topic, int partition,
            int idlePeriod) {
        this.factory.setConsumerFactory(consumerFactory);
        this.factory.getContainerProperties().setIdleEventInterval(idlePeriod * 1000L);
        // this.factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        ConcurrentMessageListenerContainer<String, String> container = this.factory
                .createContainer(new TopicPartitionOffset(topic, partition));
        container.setupMessageListener((MessageListener<String, String>) record -> {


 //here I am consuming the topic
            kafkaService.proccessorConsumer(record);

        });
        this.containers.put("provisioning_group", container);
        container.start();
        return container;
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        log.warn("Idle period has been captured", event);
        kafkaService.processIdelConsumer();
    }
@PostConstruct
private void init() {
         this.container = newContainer(getInputTopic(), 0, getIdlePeriod());
        this.oldIdlePeriod = provisionConfig.getIdlePeriod();
        log.info("*********************** A new container has initilized ***************************** ");

}

}

notice where I am consuming the topic I want to ack after the process finished


Solution

  • See an AcknowledgingMessageListener instead of that generic MessageListener:

    container.setupMessageListener((AcknowledgingMessageListener<String, String>) (record, acknowledgment) -> {
    
    
            //here I am consuming the topic
            kafkaService.proccessorConsumer(record);
    
            acknowledgment.acknowledge();
        });
    

    More in docs: https://docs.spring.io/spring-kafka/reference/html/#message-listeners