Search code examples
javaapache-camelspring-camel

Is there a way to split exchange body (list) into chuncks instead of one-element exchanges?


Is there a way to split body of exchange (List) into bigger chuncks than 1 element exchanges?

When using big files to process, i end up with huge number of one-line exchanges and alot of memory consumption.

       from(url.concat("RAW(" + filesToInclude + ")"))
                .log("Start processing file: ${header.CamelFileName}")
                .routeId(CamelRoutes.LIST_ROUTE_ID + endpointConfig.getConfigId())
                .setHeader(CamelHeaders.ENDPOINT_CONFIG, constant(endpointConfig))

                .unmarshal(getCsvDataFormat(endpointConfig))

                .split(body())
                    .streaming().parallelProcessing().executorService(Executors.newFixedThreadPool(CONCURRENT_THREADS_NUMBER))
                    .stopOnException().stopOnAggregateException()
                    .marshal().json(JsonLibrary.Jackson)
                    .log("Starting aggregating for batch processing")
                    .aggregate(header(Exchange.FILE_NAME), new ListAggregationStrategy())
                        .completionPredicate(new BatchSizePredicate(endpointConfig.getMaxBatchSize()))
                        .completionTimeout(endpointConfig.getBatchTimeout())
                        .log("Start one batch processing")
                        .bean(rowCamelService, "saveInDB")
                        .log("Finished one batch processing")
                    .end()
                    .log("Finished aggregating for batch processing")
                    .log("Finished processing file: ${header.CamelFileName}")
                .end();

For 60MB of CSV file, it takes over 1G or memory. I need to lower it down to 500MB at maximum.


Solution

  • You use setBody(...) to partition the list before splitting

    public class MyRouteBuilder extends RouteBuilder {
       private static final int BATCH_SIZE = 1000;
    
       private <T> List<List<T>> splitIntoBatches(List<T> list, int batchSize) {
          // TODO implement using one of the suggestions from https://www.baeldung.com/java-list-split
       }
    
       public void configure() {
          from(...)
             .setBody(exchange -> {
                List<String> list = exchange.getMessage().getBody(List.class);
                return splitIntoBatches(list, BATCH_SIZE);
             })
             .split(body())
             .streaming()
             ...
       }
    }