Search code examples
apache-camelrestrictionsaggregatorfilesplitting

How to aggregate exchanges without restrictions


We have current situation. I use Apache Camel, split large file for small exchanges (using splitter, see below) and validate them. Then I need aggregate messages, but I use aggregator and it requires setup complition size or another. Can I aggragate all exchanges from current document without setting restrictions?

My Route:

 from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}")
                .transacted()
                .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator"))
                .streaming()
                .process(new ValidatorProcessor())
                .choice()
                .when(new Predicate() {
                    @Override
                    public boolean matches(Exchange exchange) {
                        return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS);
                    }
                })
                .to("jpa:/...")
                .otherwise()
                .aggregate(body(String.class), new MyAggregationStrategy()).completionSize(????)
                .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}");

To set aggregator I use to set count of exchanges or time, but I don't know how many exchanges will be.


Solution

  • So splitter EIP in Camel generates a header called CamelSplitComplete every time it finishes splitting an exchange. This header is a boolean value.

    What I would do is use completionPredicate() in the aggregator instead of completionSize(). So whenever that header is true it will finish aggregating:

    from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}")
        .transacted()
        .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator"))
        .streaming()
        .process(new ValidatorProcessor())
        .choice()
        .when(new Predicate() {
                    @Override
                    public boolean matches(Exchange exchange) {
                        return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS);
                    }
                })
         .to("jpa:/...")
         .otherwise()
         .aggregate(body(String.class), new MyAggregationStrategy()).completionPredicate(header("CamelSplitComplete") == true)
         .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}");
    

    I hope this is what you are looking for.