Search code examples
springspring-bootspring-integrationspring-kafkaspring-integration-dsl

Why Spring Integration QueueChannel runs sequentially with the delayed delivery message in kafka


When using Kafka integration and configuring a QueueChannel,

The processing of messages after the queue channel receives are executed sequentially with a delay of one second, it is not possible to understand the reason, the queue channel should be an accumulation of messages (up to the configured limit) and release the messages from the queue as long as it is not empty and there is a consumer. Why are messages released sequentially with a delay of one second?

follows the log, as can be seen, the messages are received immediately (according to the date of the log) and are processed sequentially with a delay of 1 second?
2020-04-06 13:08:28.108  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 2 - enriched
2020-04-06 13:08:28.109  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 2 - enriched
2020-04-06 13:08:28.110  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 7 - enriched
2020-04-06 13:08:28.111  INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 5 - enriched
2020-04-06 13:08:28.116  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 6 - enriched
2020-04-06 13:08:28.119  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 4 - enriched
2020-04-06 13:08:28.120  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 1 - enriched
2020-04-06 13:08:28.121  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 8 - enriched
2020-04-06 13:08:28.122  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 3 - enriched
2020-04-06 13:08:28.123  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 9 - enriched
2020-04-06 13:08:28.124  INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler   : readKafkaChannel: item: 10 - enriched
2020-04-06 13:08:29.111  INFO 30718 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 7 - enriched
2020-04-06 13:08:30.112  INFO 30718 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 5 - enriched
2020-04-06 13:08:31.112  INFO 30718 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 6 - enriched
2020-04-06 13:08:32.113  INFO 30718 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 4 - enriched
2020-04-06 13:08:33.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 1 - enriched
2020-04-06 13:08:34.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 8 - enriched
2020-04-06 13:08:35.113  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 3 - enriched
2020-04-06 13:08:36.114  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 9 - enriched
2020-04-06 13:08:37.114  INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : channelThatIsProcessingSequential - item: 10 - enriched

Blockquote

package br.com.gubee.kafaexample

import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.context.IntegrationContextUtils
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.kafka.dsl.Kafka
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController


@RestController
@RequestMapping(path = ["/testKafka"], produces = [MediaType.APPLICATION_JSON_VALUE])
class TestKafkaResource(private val testKafkaGateway: TestKafkaGateway) {

    @GetMapping("init/{param}")
    fun init(@PathVariable("param", required = false) param: String? = null) {
        (1..10).forEach {
            println("Send async item $it")
            testKafkaGateway.init("item: $it")
        }
    }

}

@MessagingGateway(errorChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
@Component
interface TestKafkaGateway {
    @Gateway(requestChannel = "publishKafkaChannel")
    @Async
    fun init(param: String)
}

@Configuration
@EnableIntegration
class TestKafkaFlow(private val kafkaTemplate: KafkaTemplate<*, *>,
                    private val consumerFactory: ConsumerFactory<*, *>) {

    @Bean
    fun readKafkaChannelTopic(): NewTopic {
        return NewTopic("readKafkaChannel", 40, 1)
    }

    @Bean
    fun publishKafka(): IntegrationFlow {
        return IntegrationFlows
                .from("publishKafkaChannel")
                .transform<String, String> { "${it} - enriched" }
                .handle(
                        Kafka.outboundChannelAdapter(kafkaTemplate)
                                .topic("readKafkaChannel")
                                .sendFailureChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .get()
    }

    @Bean
    fun readFromKafka(): IntegrationFlow {
        return IntegrationFlows
                .from(
                        Kafka.messageDrivenChannelAdapter(consumerFactory, "readKafkaChannel")
                                .configureListenerContainer { kafkaMessageListenerContainer ->
                                    kafkaMessageListenerContainer.concurrency(2)
                                    kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
                                }
                                .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .channel { c -> c.queue(10) }
                .log<String> {
                    "readKafkaChannel: ${it.payload}"
                }
                .channel("channelThatIsProcessingSequential")
                .get()
    }

    @Bean
    fun kafkaFlowAfter(): IntegrationFlow {
        return IntegrationFlows
                .from("channelThatIsProcessingSequential")
                .log<String> {
                    "channelThatIsProcessingSequential - ${it.payload}"
                }
                .get()
    }
}

Solution

  • As Gary said, it is not good to shift Kafka messages into a QueueChannel. The consumption on the Kafka.messageDrivenChannelAdapter() is already async - really no reason to move messages to the separate thread.

    Anyway it looks like you use Spring Cloud Stream with its PollerMetadata configured to a 1 message per second polling policy.

    If that doesn't fit your requirements, you always can change that .channel { c -> c.queue(10) } to use a second lambda and configure a custom poller over there.

    BTW, we have already some Kotlin DSL implementation in Spring Integration: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/kotlin-dsl.html#kotlin-dsl