Search code examples
javaapache-camelcxfcxfrs

Apache Camel - send split results to CXF endpoint


I have a route that should process request from CXF endpoint and return results as JSON:

public class MyRoute extends RouteBuilder
{

// ... Autowired:
// msgRequestProcessor - converts json {string,string} to String of "\n" delimited json string: {string}\n{string}
// RangeProcessor, SingleProcessor - create mongodb Criteria object from json string
// msgTypeMapper - adds corresponding header "msg.type"

@Override
public void configure()
{
    from("direct:list")
            .process(msgRequestProcessor)
            .split(body())
                .bean(msgTypeMapper.class)
                .choice()
                    .when(header("msg.type").isEqualTo("single"))
                        .log("Go to direct:single")
                        .to("direct:single")
                    .otherwise()
                        .log("Go to direct:range")
                        .to("direct:daterange")
            .end()
            .to("direct:aggregate");

    from("direct:range")
            .process(new RangeProcessor());

    from("direct:single")
            .process(new SingleProcessor());

    from("direct:aggregate")
            .aggregate(new MyAgg()).header("msg.collection").completionSize(2)
            .log("RETVAL: ${body}")
            .marshal().json(JsonLibrary.Gson).end();
}

public static final class MyAgg implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
    {
        if (oldExchange == null) {
            return newExchange;
        }

        Criteria oldCriteria = oldExchange.getIn().getBody(Criteria.class);
        Criteria newCriteria = newExchange.getIn().getBody(Criteria.class);

        Criteria criteria = new Criteria();
        criteria.andOperator(oldCriteria, newCriteria);
        oldExchange.getIn().setBody(criteria.getCriteriaObject().toString());

        return oldExchange;
    }
}

}

Everything works fine, I see correct aggregation results and aggregation completion in the log

but CXF endpoint always returns output of msgRequestProcessor (before split):

{"string"}
{"string"}

while I expect to see Criteria object converted to string (that I can see in the logs).

Any help would be much appreciated! Thanks.


Solution

  • Note first that your indentation is misleading, the end() is really the end of the choice(), and not of the split(); I was puzzled a while by this (as @Ralf was maybe.)

    Now, the aggregation works, but its result is discarded because indeed the result of the split is the input message.

    For a request/reply usage of the splitter (in-out), you really have to declare the aggregation strategy along with the split() as explained here (same misleading indentation).

    In the official documentation you mention, the situation is the other way around (in-only): the result of the splitter is discarded, and the result of the aggregation is routed downstream.