Essentially - how is it possible to use Camel's Aggregator with a File based input?
By default, when an Exchange created using the File component's input completes its Route the file is moved to $inputLocation/.camel/
.
However, when using an Aggregator, after the Exchange goes through the aggregation step it goes on to complete the remainder of the Route outside of the Aggregation step (which there may or may not be). This means that the Exchange finishes the Route and if a File Consumer was used it causes the file to be moved to $inputLocation/.camel/
.
At some later time the Aggregator will complete and when it does so the aggregated File references will be invalid, because the file was moved on disk when the regular Exchange completed the Route.
The following depicts a simple Route that shows the issue. The referenced Processor doSomethingWithTheAggregatedFiles
will find that the files in question do not exist at their stated location (as they have already been moved (to workdirs/in/.camel/
):
@Override
public void configure() throws Exception {
from("file:workdirs/in/")
.aggregate(constant(true), AggregationStrategies.groupedExchange())
.completionTimeout(10*1000) // 10 seconds
.process("doSomethingWithTheAggregatedFiles")
.end()
.log("file completed route: $simple{header[CamelFileName]}")
.end();
}
Is there something missing here? Is there no way to get it to "wait" until the aggregated exchange completes?
Looking at the ZipAggregationStrategy
I see that it doesn't suffer this issue because it reads the content of the files during the aggregation step and adds it into the ZIP file - essentially taking a copy of the data as the aggregation step occurs. Preferably I do not want to take a copy, as my input may be large.
You could set the file consumer's noop to true
and implement a simple processor to move the files after your aggregation is complete.
.process(e -> {
String name = e.getIn().getHeader("CamelFileNameConsumed", String.class);
Files.move(
Path.of(name),
Path.of(".camel").resolve("name"));
})
Another solution that might work for you is to force the Aggregator to run in the same thread as its caller:
.aggregate(constant(true), AggregationStrategies.groupedExchange())
.executorService(new SynchronousExecutorService())
...
This results in all aggregations completing prior to the File component being notified that the route is complete.