Search code examples
kotlinspring-integrationspring-integration-dslkotlin-dsl

Spring Integration Kotlin DSL aggregate configuration


I'm trying to aggregate messages in a list in order to use AmqpOutboundEndpoint.multiSend option. I followed this solution but I use Kotlin DSL instead of XML. Here is the code sample:

@Configuration
@EnableIntegration
class SampleConfiguration {

  @Bean
  fun sampleFlow(amqpTemplate: AmqpTemplate): StandardIntegrationFlow {
    return IntegrationFlow
      .from("inputChannel")
      .aggregate(Consumer<AggregatorSpec> {
        it.releaseExpression("size() == 100")
          .groupTimeout(1000)
          .sendPartialResultOnExpiry(true)
          .correlationExpression("T(Thread).currentThread().id")
          .poller { p: PollerFactory -> p.fixedRate(1000).maxMessagesPerPoll(100) }
      })
      .handle(Amqp.outboundAdapter(amqpTemplate).exchangeName("sampleExchange").multiSend(true))
      .get()
  }
}

I'm getting a compilation error:

Overload resolution ambiguity: 
public open fun aggregate(aggregator: Consumer<AggregatorSpec!>?): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder
public open fun aggregate(aggregatorProcessor: Any): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder

I couldn't find any way to overcome this problem. How can I configure an Aggregator using Kotlin DSL? I couldn't find any examples of a working code on the Internet.

Spring Integration version: 6.2.1 Kotlin version: 1.9.22


Solution

  • What you have so far is not a Kotlin DSL, but rather a Java API used in Kotlin code. There is indeed could be a problem with compatibility between those two languages, especially when we deal with lambdas.

    And that's why we developed a dedicated Kotlin DSL for Spring Integration a while ago: https://docs.spring.io/spring-integration/reference/kotlin-dsl.html

    So, something like this must work for you:

        @Bean
        fun sampleFlow(amqpTemplate: AmqpTemplate) =
                integrationFlow("inputChannel") {
                    aggregate {
                        releaseExpression("size() == 100")
                        groupTimeout(1000)
                        sendPartialResultOnExpiry(true)
                        correlationExpression("T(Thread).currentThread().id")
                        poller { it.fixedRate(1000).maxMessagesPerPoll(100) }
                    }
                    handle(Amqp.outboundAdapter(amqpTemplate)
                            .exchangeName("sampleExchange")
                            .multiSend(true))
                }