Search code examples
javaapache-camelspring-camelcamel-ftp

Apache Camel: Process file line by line


I have a large file that I want to read and process. I don't want to load it entirely into memory and instead of this I read each line of the file separately and perform actions on it. I've come to this implementation during the work:

@Override
public void configure() {
    from(fileSftpLocationUrl)
        .routeId("my-route")
        .onException(Exception.class).handled(true).bean(exchangeErrorHandler, "processError").end()
        .split(body().tokenize("\n")).streaming()
        .filter(/*condition for skip first and last line*/)
        .bean(/*my action*/)
        .to(String.format("activemq:%s", myQueue));
}

Before starting to read the file I skip header and footer .filter(/*condition for skip first and last line*/) on that and in the next line I try to start reading my file line by line .split(body().tokenize("\n")).streaming() but something is going wrong and I get all information from the file in its entirety. I see that problem in the .bean(/*my action*/) when parsing that data and perform actions on them.

I think that my problem is hidden at the beginning because the algorithm looks strange, first I describe the condition for the whole file (skip header and footer), then I ask Camel to process it line by line, and only then the action for a specific line.

My question is, how do I change this implementation so that the file is processed line by line?


Solution

  • I think I got it. By default, the split result is sent to the FIRST next endpoint

    from(...)
        .split(body().tokenize("\n")).streaming()
        .to("direct:processLine")
    

    If you want to send it to a complex routing, you have to mark the split ending, eg

    from(...)
           
         .split(body().tokenize("\n")).streaming()
            .filter(/*condition for skip first and last line*/)
            .bean(/*my action*/)
            .to(String.format("activemq:%s", myQueue))
         .end()
         .log("Split done");
    

    If you omit the end(), the logic will be this one (see indentation):

    from(...)
           
         .split(body().tokenize("\n")).streaming()
            .filter(/*condition for skip first and last line*/)
         .end() // Implicit 
         .bean(/*my action*/)
         .to(String.format("activemq:%s", myQueue))         
    

    -> in your attempt, the bean(...) was invoked with the original message (after the split was performed)

    See it like a kind of "for-loop"

    for (String line: lines) 
    filter(line);
    bean.run(line);
    sendto(...);
    

    is not the same at all as:

    for (String line: lines) {
      filter(line);
      bean.run();
      sendto(...);
    }