Search code examples
spring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-function

Is there built-in support for setting an interval for processing message in Spring Cloud Stream?


Currently I have set up a function to consumer Kafka messages as follows:

@Bean(name = "streamSrc")
public java.util.function.Consumer<org.springframework.messaging.Message<byte[]>> consumeStream() {
    return message -> {
        byte[] rawMessage = message.getPayload();
        byte[] rawKey = (byte[]) message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
        Consumer<?, ?> consumer = (Consumer<?, ?>) message.getHeaders().get(KafkaHeaders.CONSUMER);
        String topic = (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
        int partitionId = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
        log.debug("processing message {}, key {}, using consumer {}, for topic {}, partition {}", rawMessage, rawKey, consumer, topic, partitionId);
        consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
        //do processing of message
    };
}

then in properties:

spring.cloud.function.definition=streamSrc
spring.cloud.stream.function.bindings.streamSrc-in-0=source1
spring.cloud.stream.bindings.source1.content-type=application/json
spring.cloud.stream.bindings.source1.destination=my-kafka-topic
spring.cloud.stream.bindings.source1.consumer.header-mode=headers
spring.cloud.stream.bindings.source1.group=group1
spring.cloud.stream.bindings.source1.consumer.partitioned=true
spring.cloud.stream.bindings.source1.consumer.concurrency=2
spring.cloud.stream.kafka.bindings.source1.consumer.idleEventInterval=5000
spring.cloud.stream.kafka.bindings.source1.consumer.configuration.max.poll.records=100

This effectively sets an interval of 5 seconds where a batch of 100 records is read per interval. Is there a better, perhaps more declarative, way to schedule these intervals? Newer versions of Spring Cloud support "batch mode" e.g. ....consumer.batch-mode=true but I'm not aware of any way, property-driven or annotation-driven, to enforce a schedule for consuming messages. Bonus would be skipping an interval if all messages had not completed processing. Any alternate ideas are welcome.

P.s. to un-pause, I'm using:

@Bean
public ApplicationListener<ListenerContainerIdleEvent> tryUnpause() {
    return event -> {
        if (!event.getConsumer().paused().isEmpty()) {
            event.getConsumer().resume(event.getConsumer().paused());
        }
    };
}

Solution

  • idleEventInterval

    That's not what that property means; it means "publish a container idle event" if no records have been received within that interval.

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#idleEventInterval

    and https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-containers

    Listener containers are message-driven so records will be processed whenever they are available.

    You can use a ListenerContainerCustomizer bean to set the idleBetweenPolls to delay the next poll after completing the processing of the records from the previous poll.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#idleBetweenPolls