Search code examples
apache-camelsplitter

can the camel splitter skip on messages rows of some value like empty or null?


I have camel route on ingress of files recieved, sometimes these files contain multiple can be thousands of empty rows or records. these occur at the end of the files.

help or advice on how to handle this situation.

2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

The route goes to a splitter.

    <route autoStartup="true" id="core.predix.accept.file.type.route">
        <from id="_from3" uri="{{fileEntranceEndpoint}}"/>
        <convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
        <split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
            <tokenize token="\n"/>
            <process id="_process3" ref="toCsvFormat"/>
            <!-- passthru only we do not allow embedded commas in numeric data -->
        </split>
        <log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
        <choice id="_choice1">
            <when id="_when1">
                <simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
                <log id="_log2" message="${file:name} accepted for processing..."/>
                <choice id="_choice2">
                    <when id="_when2">
                        <simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
                        <setHeader headerName="CamelFileName" id="_setHeader1">
                            <simple>${file:name.noext.single}.csv</simple>
                        </setHeader>
                        <log id="_log3" message="${file:name} changed file name."/>
                    </when>
                </choice>
                <split id="_split2" streaming="true">
                    <tokenize prop:group="noOfLines" token="\n"/>
                    <log id="_log4" message="Split Group Body: ${body}"/>
                    <to id="_to1" uri="bean:extractHeader"/>
                    <to id="acceptedFileType" ref="predixConsumer"/>
                </split>
                <to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
            </when>
            <otherwise id="_otherwise1">
                <log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
                <to id="_to3" uri="{{unhandledArchive}}"/>
            </otherwise>
        </choice>
    </route>

The simple aggregator

public class CsvAggregationStrategy implements AggregationStrategy {

private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

    //Theory 
    //-------------------------------------------------------------------------------------
    // Arrived    | oldExchange  |  newExchange | Description
    //-------------------------------------------------------------------------------------
    // A         | NULL         |  A            | first message arrives for the first group
    // B         | A            |  B            | second message arrives for the first group
    // F         | NULL         |  F            | first message arrives for the second group
    // C         | AB           |  C            | third message arrives for the first group
    //---------------------------------------------------------------------------------------
    log.debug("Aggregation Strategy :: start");

    if ( newExchange.getException() != null ) {
      if ( oldExchange == null ) {
        return newExchange;
      } else {
        oldExchange.setException(newExchange.getException());
        return oldExchange;
        }
    }

    if ( oldExchange == null ) {  //This will set the 1st record with the Header
        return newExchange;
    }

String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );

log.debug("Aggregation Strategy :: finish");
return oldExchange;

} //Exchange process
} //class AggregationStrategy

I thought I would handle the empty rows in the class toCsvFormat

The class ToCsvFormat simply changes the inbound csv delimiter to a comma.

public class ToCsvFormat implements Processor {

private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);

@Override
public void process(Exchange exchange) throws Exception {

    String body = exchange.getIn().getBody(String.class);

    body = body.replaceAll("\\t|;",",");

    String bodyCheck = body.replaceAll(",","").trim();
    LOG.info("BODY CHECK: " + bodyCheck);
    if ( bodyCheck.isEmpty() || bodyCheck == null ) {

        throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");

    } else {

        StringBuilder sb = new StringBuilder(body.trim());

        LOG.debug("CSV Format Body In: " + sb.toString());
        LOG.debug("sb length: " + sb.length());

        if ( sb.toString().endsWith(",") ) {

            sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
        }

        LOG.info("CSV Format Body Out: " + sb.toString());
        sb.append(System.lineSeparator());
        exchange.getIn().setBody(sb.toString());
    }

}

}

*** the problem I'm having is I need the splitter to finish processing until it hits all the empty rows, or skip over or stop the splitter on empty records. but I need what was previously split or processed. Throwing and capture of exception stops the splitter I get nothing. I'm using the splitter stoponexception but like it says, it stops on the exception.

thank you


Solution

  • So you set up stopOnException=true and asked why your route stopped when exception wasn't catched =) ? As workaround forget about throwing exception and validate your body and if it has inappropriate data just set empty body and then sum them in your AggregationStrategy like in pseudo-route below. I haven't used the xml description for a very long time so i hope your will understand this example with Java DSL.

    public class ExampleRoute extends RouteBuilder {
    
    AggregationStrategy aggregationStrategy = new AggregationStrategy() {
        @Override
        public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
            log.debug("Aggregation Strategy :: start");
            if (oldExchange != null) {
                newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + oldExchange.getIn().getBody(String.class));
            }
            log.debug("Aggregation Strategy :: finish");
            return newExchange;
        }
    };
    
    @Override
    public void configure() throws Exception {
        from("{{fileEntranceEndpoint}}")
                .convertBodyTo(String.class)
                .split(tokenize("\n"), aggregationStrategy).streaming().stopOnException()
                    .choice()
                    .when(body().regex(",+\\$"))
                        .setBody(constant(""))
                    .otherwise()
                        .process("toCsvFormat")
        ;
    }
    

    I recommend you use Java DSL. As you can see, many things are easy to use with it.