Is it possible to use KafkaMessageDrivenChannelAdapter
(KafkaMessageListenerContainer
) with Poller to read from Kafka every x minutes?
I try creating bean
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5, TimeUnit.MINUTES));
return pollerMetadata;
}
But looks like it is not effecting.
No; the KafkaMessageDrivenChannelAdapter
is push, not pull, technology.
Use the inbound channel adapter instead.
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, "myTopic")
.groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
or
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
source.setGroupId("myGroupId");
source.setClientId("myClientId");
return source;
}
You must ensure that the max.poll.interval.ms
is larger than your poll interval, to avoid a rebalance.