Search code examples

Camel: Aggregator doesn't persist Exchange properties

I'm using camel:aggregate backed by jdbc and it seems it doesn't save Exchange properties. For instance, if I configure the following route and the execution is stopped once aggregation has been completed and just before execute camel:to(log) forcing the aggregation to retrieve data from database when restarted, then camel:to(log) won't print the property myProperty

<camel:route id="myRoute">
    <camel:from uri="direct:in"/>

    <camel:setProperty propertyName="myProperty">

    <camel:aggregate strategyRef="myStrategy" aggregationRepositoryRef="myAggregationRepo" discardOnCompletionTimeout="true" completionTimeout="86400000" >
            <camel:simple>${property.partlastcorrelationwaitmore} == false</camel:simple>

        <camel:to uri="log:com.test?showAll=true"/>


My aggregation repository is configured this way:

<bean id="myAggregationRepo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository" init-method="start" destroy-method="stop">
    <property name="transactionManager" ref="transactionManager"/>
    <property name="repositoryName" value="PROC_AGG"/>
    <property name="dataSource" ref="oracle-ds"/>
    <property name="lobHandler">
        <bean class="">
            <property name="nativeJdbcExtractor">
                <bean class=""/>

How can I save properties when using the Aggregator?


  • I'll reply myself. As seen on the code JdbcCamelCodec doesn't allow to save properties when backing the Aggregator with a database:

    public final class JdbcCamelCodec {
        public byte[] marshallExchange(CamelContext camelContext, Exchange exchange) throws IOException {
            // use DefaultExchangeHolder to marshal to a serialized object
            DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);
            // add the aggregated size property as the only property we want to retain
            DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_SIZE, exchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class));
            // add the aggregated completed by property to retain
            DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_COMPLETED_BY, exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class));
            // add the aggregated correlation key property to retain
            DefaultExchangeHolder.addProperty(pe, Exchange.AGGREGATED_CORRELATION_KEY, exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class));
            // persist the from endpoint as well
            if (exchange.getFromEndpoint() != null) {
                DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri());
        return encode(pe);

    Basically, the problem lies on this line where false means: don't save properties.

    DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false);

    The headers and the body are the only ones stored on database.