Search code examples

Spring Integration: get all headers involved in an aggration and not just the last one

I have a Spring integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))

And a service activator defined like this:

public class ServiceActivator {

    public void myMethod(List<> Collection<MyEvent> events) {

What I'm trying to do is to change the myMethod to also take a list of particular headers associated with each message in the aggregation. In my case I'd like to get a hold of the AmqpHeaders.CHANNEL and AmqpHeaders.DELIVERY_TAG for each message so that I can perform an ACK or NACK for each message to RabbitMQ (note that I'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK's/NACK's after myMethod has executed).

I've tried for example this approach:

public class ServiceActivator {

    public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels, 
                         @Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags, 
                         Collection<MyEvent> events) {

but here I only seem to get header values for the last message (i.e. channels and tags are always of size 1 even though there are multiple events in the events collection).

I've also tried changing Collection<MyEvent> to Collection<Message> (org.springframework.messaging.Message) in order to try to manually extract the headers but this fails with:

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message

since the message has already been transformed by the message converter defined in the IntegrationFlow.

How can I achieve this?


  • You need a custom outputProcessor on your aggregator to return the message you desire (with lists of channels/delivery tags in headers and list of events in the payload).