Search code examples
spring-integrationspring-integration-dsl

IntegrationFlowDefinition.aggregate doesn't work: Maybe the CorrelationStrategy is failing?


Error message: Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?

My implementation,

    @Bean
    public IntegrationFlow start() {
        return IntegrationFlows
                .from("getOrders")
                .split()
                .publishSubscribeChannel(c -> c.subscribe(s -> s.channel(q -> q.queue(1))
                        .<Order, Message<?>>transform(p -> MessageBuilder.withPayload(new Item(p.getItems())).setHeader(ORDERID, p.getOrderId()).build())
                        .split(Item.class, Item::getItems)
                        .transform() // let's assume, an object created for each item, let's say ItemProperty to the object.
                                     //  Transform returns message; MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build();
                        .aggregate() // so, here aggregate method needs to aggregate ItemProperties.
                        .handle() // handler gets List<ItemProperty> as an input.
      ))
      .get();
    }

Both splitters works fine. I've also tested the transformer after the second splitter, works fine. But, when it comes to aggregate it is failing. What I am missing here?


Solution

  • You are missing the fact that transformer is that type of endpoint which deal with the whole message as is. And if you create a message by yourself, it doesn't modify it. So, with your MessageBuilder.withPayload(createItemProperty(getItemName, getItemId)).build(); you just miss important sequence details headers after splitter. Therefore an aggregator after that doesn't know what to do with your message since you configure it for default correlation strategies but you don't provide respective headers in the message.

    Technically I don't see a reason to create a message over there manually: the simple return createItemProperty(getItemName, getItemId); should be enough for you. And the framework will take about message creation on your behalf with respective request message headers copying.

    If you really still think that you need to create a message yourself in that transform, then you need to consider to copyHeaders() on that MessageBuilder from the request message to carry required sequence details headers.