I am new to Apache Camel.
I need to split a file line by line and to do some operation on each lines. At the end I need a footer line with information from previous lines (number of lines and sum of the values of a column) My understanding is that I should be using an aggregation strategy, so I tried something like that:
.split(body().tokenize("\r\n|\n"), sumAggregationStrategy)
.process("fileProcessor")
In my aggregation strategy I just set two headers with the incremented values:
newExchange.getIn().setHeader("sum", sum);
newExchange.getIn().setHeader("numberOfLines", numberOfLines);
And in the processor I try to access those headers:
int sum = inMessage.getIn().getHeader("sum", Integer.class);
int numberOfLines = inMessage.getIn().getHeader("numberOfLines", Integer.class);
There are two problems.
First of all the aggregation strategy seem to be called after the first iteration of the processor.
Second, my headers don't exist in the processors, so I can't access the information I need when I am at the last line of the file. The headers do exist in the oldExchange of the aggregators though.
I think I can still do it, but I would have to create a new processor just for the purpose of making the last line of the file. Is there something I'm missing with the aggregation strategies ? Is there a better way to do this ?
An aggregator will be called for every iteration of the split. This is how they are supposed to work.
The reason you don't see the headers within the processor is, headers live and die with the message and not visible outside. You need to set the 'sum' and 'numberOfLines' as exchange properties instead. Because every iteration within a split results in an exchange, you need get the property from old exchange and set them again in the new exchange to pass them to subsequent components in the route.
This is how you could do,
AggregationStrategy:
public class SumAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
long sum = 0;
long numberOfLines = 0;
if(oldExchange != null) {
sum = (Long) oldExchange.getProperty("sum");
numberOfLines = oldExchange.getProperty("numberOfLines ");
}
sum = sum + ((Line)newExchange.getIn().getBody()).getColumnValue();
numberOfLines ++;
newExchange.setProperty("sum", sum);
newExchange.setProperty("numberOfLines",numberOfLines);
oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete")); //This is for the completion predicate
return newExchange;
}
}
Route:
.split(body().tokenize("\r\n|\n"),sumAggregationStrategy)
.completionPredicate(simple("${exchangeProperty.CamelSplitComplete} == true"))
.process("fileProcessor").to("file:your_file_name?fileExist=Append");
Processor:
public class FileProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
long sum = exchange.getProperty("sum");
long numberOfLines = exchange.getProperty("numberOfLines");
String footer = "Your Footer String";
exchange.getIn().setBody(footer);
}
}