I have a task with CSV files. I need to limit the size of the CSV because the backend engine has constraints on payload sizes.
The problem is extracting the header, the first record/row, saving it off and adding it back to the remaining splitted data, thus creating multiple files all with the same header. I was hoping to find an elagant way of handling this. what I have works but it is well, less than desirable coding.
Also, I need the group By parm to be programable, I am trying to find out now if this can be set by a property in the camelContext.
This is what I have, it works, but... and I can't get the groupBy to accept a parameter.
my route
<!-- route on Weekends -->
<route id="inRouteWkEndBfmt1" routePolicyRef="startPolicyWkEnd" autoStartup="false" >
<from id="mainProcessingRouteWkEnd" ref="AsciiGatewayBackfillmt1" />
<convertBodyTo type="java.lang.String" />
<log message="File ${file:name} was received."/>
<setHeader headerName="messageDateTime">
<simple>${date:now:MM-dd-yyyy-HH:mm:ss}</simple>
</setHeader>
<split streaming="true" >
<tokenize token="\n" group="50"/>
<log message="Split line Body: ${body}"/>
<process ref="asciiSplitterProcessor" />
<log loggingLevel="INFO" message="Successfully sent ${file:name} to MT1 Core for Analytics Observation." />
<to id="windowsShareTargetWkEnd" uri="file://{{target.folder}}" />
</split>
<process ref="asciiCleanUp" />
</route>
Code
public void process(Exchange exchange) throws Exception {
log.info("Ascii Splitter Processor :: start");
String inBody = exchange.getIn().getBody(String.class);
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + cntr + fileSuffix;
exchange.getIn().setHeader("CamelFileName",fileName);
cntr++;
fileName = (String) exchange.getIn().getHeader("CamelFileName");
log.info("File being processed: " + fileName );
log.debug("Message record: " + inBody);
StringBuilder sb = new StringBuilder();
Scanner sc = new Scanner(inBody);
if ( ! hdrFlag ) {
while ( sc.hasNextLine() ) {
record = sc.nextLine();
log.debug("record: " + record);
log.debug("HEADER FLAG: " + hdrFlag);
if ( !hdrFlag ){
HEADER = record + "\r\n";
hdrFlag = true;
log.debug("HEADER: " + HEADER);
}
sb.append(record).append("\r\n");
}
} else {
sb.append(HEADER).append(inBody);
}
sc.close();
exchange.getIn().setBody(sb.toString());
sb = new StringBuilder();
I think this is a bit more elagant than above. unfortunately I'm not on camel 2.9. but this works for sub unit of work that need to be joined at the server side for large payloads of CSV's, I'm converting to Json and sending to server.
thanks everyone. hope this helps someone else with the use case.
public void process(Exchange exchange) throws Exception {
log.info("Entering Extract Header Processor ...");
//if file is split in to multiple files modify the name with an index
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + fileCounter + fileSuffix;
fileCounter++;
//fileName = filePrefix + "_" + Integer.valueOf((int)exchange.getProperty("CamelSplitSize")) + fileSuffix; // need camel 2.9 for this to work, bummer
exchange.getIn().setHeader("CamelFileName",fileName);
log.info(" FILE NAME: " + exchange.getIn().getHeader("CamelFileName", fileName));
//log.info("File Counter: " + Integer.valueOf((int)exchange.getProperty("CamelSplitSize"))); // need camel 2.9 for this to work, bummer
log.info("File Counter: " + fileCounter );
//if this is the first split body, get the header to attach to the other split bodies
String body = exchange.getIn().getBody(String.class);
StringBuilder sb = new StringBuilder();
if ( (Integer.valueOf((int)exchange.getProperty("CamelSplitIndex")) == 0 ) ) {
List<String> serviceRecords = new ArrayList<String>(Arrays.asList(body.split(System.lineSeparator())));
StringBuilder header = getHeader( serviceRecords );
HEADER = header.toString();
exchange.getIn().setBody(body);
} else {
sb.append(HEADER).append(System.lineSeparator()).append(body);
exchange.getIn().setBody(sb.toString());
}
sb = new StringBuilder();
log.debug("HEADER: " + HEADER);
log.info("Exiting Extract Header Processor ... :: Finish");
}
public StringBuilder getHeader(List<String> serviceRecords) {
StringBuilder sb = new StringBuilder();
for ( int i = 0; i < 1; i++ ) {
log.debug("record: : " + serviceRecords.get(i).toString());
if ( i == 0 ) {
String[] sa = serviceRecords.get(i).toString().split(",");
for ( int j = 0; j < sa.length; ++j) {
if ( j != 0 ) {
sb.append(sa[j]).append(",");
}
}
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
} else {
break;
}
}
return sb;
}
public void cleanHeader() {
HEADER = "";
fileCounter = 0;
}
}
Route
<route
id="core.accept.file.type.route"
autoStartup="true" >
<from uri="{{fileEntranceEndpoint}}" />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV)$'</simple>
<log message="${file:name} accepted for processing..." />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(CSV)$'</simple>
<setHeader headerName="CamelFileName">
<simple>${file:name.noext}.csv</simple>
</setHeader>
</when>
</choice>
<split streaming="true" >
<tokenize token="\n" group="600" />
<log message="Split Group Body: ${body}"/>
<to uri="bean:extractHeader" />
<to id="acceptedFileType" ref="pConsumer" />
</split>
<to uri="bean:extractHeader?method=cleanHeader"/>
<!-- <to id="acceptedFileType" ref="pConsumer" /> -->
</when>
<otherwise>
<log message="${file:name} is an unknown file type, sending to unhandled repo." loggingLevel="INFO" />
<to uri="{{unhandledArchive}}" />
</otherwise>
</choice>
</route>