Search code examples
splitapache-camelsplitter

skip first line of a csv using camel EIP spliter process all other lines and aggregate all lines including the skipped line


is there a simple way to skip the first line of a csv, header, using camel EIP spliter process all other lines and aggregate all lines including the skipped line? I need to convert the date of each record in a CSV file, but skip the first line which is the header. I'm trying to use the camel EIP splitter. thank you!

<route 
    id="core.predix.consumer.route"
    autoStartup="true" >
    <from id="predixConsumer" ref="predixConsumer" />   
    <convertBodyTo type="java.lang.String" />
    <split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
        <tokenize token="\n"/> 
        <log message="Split line ${body}"/>
            <!--  <process ref="EnrichementProcessor"/> -->
    </split>
    <to uri="{{fileDestinationEndpoint}}" />
</route>

Solution

  • A little late but...

    put the logic in an aggregator ...

     <route
        id="fadec.core.PostFlightReportProducerRoute"
        autoStartup="true">
        <from uri="seda:postFlightReportProducer" />
         <split streaming="true" strategyRef="statusAggregationStrategy">
         <simple>${body}</simple>
            <log message="inbound Post Flight Summary Report message body: ${body}" />
            <process ref="postFlightReportMarshaler" />
            <to uri="velocity:templates/postFlightReportSummary.vm" />
            <log message="Velocity output: ${body}" loggingLevel="INFO" /> 
         </split>
     <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.csv" /> 
         <choice>
            <when>
                <simple>${body} == null</simple>
                <log message="body is NULL, do not send NULL body!" />
                <stop></stop>
            </when>
            <otherwise>
                <process ref="xlsxProcessor"/>
                <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
                <log message="Sending data packet: ${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
                <stop></stop>             
            </otherwise>
        </choice>
     </route>
    
    public class StatusAggregationStrategy implements AggregationStrategy {
    
        private Logger log = LoggerFactory.getLogger(StatusAggregationStrategy.class.getName());
    
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    
            //-------------------------------------------------------------------------------------
            // Arrived    | oldExchange  |  newExchange | Description
            //-------------------------------------------------------------------------------------
            // A         | NULL         |  A            | first message arrives for the first group
            // B         | A            |  B            | second message arrives for the first group
            // F         | NULL         |  F            | first message arrives for the second group
            // C         | AB           |  C            | third message arrives for the first group
            //---------------------------------------------------------------------------------------
            log.info("Status Aggregation Strategy :: start");
    
                if ( oldExchange == null ) {  //This will set the 1st record with the Header
                log.info("old Exchange is Null");
                String body = newExchange.getIn().getBody(String.class);
                newExchange.getIn().setBody(body);
                return newExchange;
            }
    
        //Each newBody msg exchange will have 2 records, a header and a data record
        String newBody = newExchange.getIn().getBody(String.class);
        String existingBody = oldExchange.getIn().getBody(String.class);
        StringBuilder osb = new StringBuilder();
    
        log.info("New Body exchange: " + newBody);
        log.info("Old Body exchange: " + existingBody);
    
        String SkipRecord = "";
        String addRecord = "";
        Scanner osc = new Scanner(newBody).useDelimiter("\r\n|\n");
        while ( osc.hasNextLine() ) {
         SkipRecord = osc.nextLine();
         //osc.nextLine();  //move past header
         log.info("aggregation: skip record: " + SkipRecord);
         if (  osc.hasNextLine() ) { 
           addRecord = osc.nextLine();
           log.info("aggregation addRecord: " + addRecord );
           osb.append(addRecord).append(System.lineSeparator());
         } else { log.error("bad newBody message exchange, has no data record!"); }
        }
        osc.close();
    
        log.info("Joined exchange: Old body: " + existingBody + " New body: " + osb.toString());
        oldExchange.getIn().setBody(existingBody + osb.toString() );
    
        log.debug("Status Aggregation Strategy :: finish");
    
        return oldExchange;
    
        } //Exchange process