Search code examples
kotlinapache-camelspring-camel

How can I split a GroupedExchangeAggregationStrategy aggregate exchange into the original exchanges?


After aggregating exchanges using a GroupedExchangeAggregationStrategy I need to split them back apart (to emit individual processing time metrics) into the original exchanges.

I tried splitting with the following but the resulting split exchange wraps the original exchange and puts it in the Message body.

Is it possible to split a GroupedExchangeAggregationStrategy aggregate exchange into the original exchanges without the wrapper exchange? I need to use the original exchange properties and would like to do so with a SpEL expression.

.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
    .completionInterval(1000)
    .completeAllOnStop()
    .process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
    .to(/* micrometer timer metric using SpEL expression */)
    // ^- the resulting split exchange is wrapped in another exchange

In the event that this isn't currently supported, I'm trying to figure out the best way to implement this behavior on my own without creating a custom Splitter processor for this single feature. I was hoping to somehow override the SplitterIterable that does the wrapping but it doesn't appear to be possible.


Solution

  • Yeah, the GroupedExchangeAggregationStrategy does nothing else than create a java.util.List of all Exchanges. The Splitter EIP on the other hand splits by default a List into the elements and puts the element into the message body. Therefore you end up with an Exchange that contains an Exchange in its body.

    What you need is an AggregationStrategy that collects all body Objects in a List instead of all Exchanges.

    You could try to use Camels FlexibleAggregationStrategy that is configurable through a fluent API.

    new FlexibleAggregationStrategy() .storeInBody() .accumulateInCollection(ArrayList.class) .pick(new SimpleExpression("${body}"));

    This should create an AggregationStrategy that extracts the body of every message (you can perhaps omit the pick method since body extraction is the pick default), collects them in a List and stores the aggregation in the message body.

    To split this aggregate again, a simple split(body()) should be enough.

    EDIT due to comment

    Yes, you are right, a side effect of my solution is that you lose properties and headers of the original messages because it only aggregates the message bodies.

    What you want to do, is splitting the List of Exchanges back into the originals. i.e. the Splitter must not create new Exchanges, but use the already present ones and throw away the aggregated wrapper Exchange.

    As far as I can see in the source code of the Splitter, this is currently not possible:

    Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
    ...
    if (part instanceof Message) {
        newExchange.setIn((Message) part);
    } else {
        Message in = newExchange.getIn();
        in.setBody(part);
    }