Search code examples
spring-integration

PollerMetadata with KafkaMessageDrivenChannelAdapter


Is it possible to use KafkaMessageDrivenChannelAdapter (KafkaMessageListenerContainer) with Poller to read from Kafka every x minutes?

I try creating bean

 @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {

        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(5, TimeUnit.MINUTES));
        return pollerMetadata;
    }

But looks like it is not effecting.


Solution

  • No; the KafkaMessageDrivenChannelAdapter is push, not pull, technology.

    Use the inbound channel adapter instead.

    https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-pollable

    @Bean
    public IntegrationFlow flow(ConsumerFactory<String, String> cf)  {
        return IntegrationFlow.from(Kafka.inboundChannelAdapter(cf, "myTopic")
                    .groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
                .handle(System.out::println)
                .get();
    }
    

    or

    @InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
    @Bean
    public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf)  {
        KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
        source.setGroupId("myGroupId");
        source.setClientId("myClientId");
        return source;
    }
    

    You must ensure that the max.poll.interval.ms is larger than your poll interval, to avoid a rebalance.