I am using the following code to create my kafka consumer container:
public void newContainer(String topic, int partition, int idlePeriod) {
this.factory.setConsumerFactory(consumerFactory);
this.factory.getContainerProperties().setIdleEventInterval(idlePeriod * 1000L);
this.factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<String, String> container = this.factory
.createContainer(new TopicPartitionOffset(topic, partition));
String groupId = UUID.randomUUID().toString();
container.getContainerProperties().setGroupId(groupId);
container.setupMessageListener((MessageListener<String, String> ) record -> {
kafkaService.proccessorConsumer(record);
});
this.containers.put(groupId, container);
container.start();
}
how could I perform manual acknowledgment , when I was using the @KafkaListener I was able to do so since the ack object is in the header but now I need to do it using the code above...any help please. below if the full class code :
@Component
public class DynamicListener implements ConsumerSeekAware {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Autowired
private ProvisionConfig provisionConfig;
@Autowired
private KafkaService kafkaService;
private int oldIdlePeriod;
private ConcurrentMessageListenerContainer<String, String> container;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers = new ConcurrentHashMap<>();
DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
public ConcurrentMessageListenerContainer<String, String> newContainer(String topic, int partition,
int idlePeriod) {
this.factory.setConsumerFactory(consumerFactory);
this.factory.getContainerProperties().setIdleEventInterval(idlePeriod * 1000L);
// this.factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<String, String> container = this.factory
.createContainer(new TopicPartitionOffset(topic, partition));
container.setupMessageListener((MessageListener<String, String>) record -> {
//here I am consuming the topic
kafkaService.proccessorConsumer(record);
});
this.containers.put("provisioning_group", container);
container.start();
return container;
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
log.warn("Idle period has been captured", event);
kafkaService.processIdelConsumer();
}
@PostConstruct
private void init() {
this.container = newContainer(getInputTopic(), 0, getIdlePeriod());
this.oldIdlePeriod = provisionConfig.getIdlePeriod();
log.info("*********************** A new container has initilized ***************************** ");
}
}
notice where I am consuming the topic I want to ack after the process finished
See an AcknowledgingMessageListener
instead of that generic MessageListener
:
container.setupMessageListener((AcknowledgingMessageListener<String, String>) (record, acknowledgment) -> {
//here I am consuming the topic
kafkaService.proccessorConsumer(record);
acknowledgment.acknowledge();
});
More in docs: https://docs.spring.io/spring-kafka/reference/html/#message-listeners