I would like to log an error on exception and continue on next record/split, but it does not work.
I tired OnExcepiton()
, doTry()
DSL but it does not work and goes to ErrorHandler.
onException(IOException.class)
.handled(true).process(exchange -> log.error("error!!"));
from("file:" + rootDir + "/" + account + "/inbox/?move=.done")
.unmarshal(csvDataFormat)
.split(body()).shareUnitOfWork().parallelProcessing().streaming()
.process(fileService)
.end()
Logs:
2018-07-18 14:01:59.883 DEBUG 45137 --- [/test1/request/] o.a.camel.processor.MulticastProcessor : Parallel processing failed due IOException reading next record: java.io.IOException: (line 4) invalid char between encapsulated token and delimiter
2018-07-18 14:01:59.885 ERROR 45137 --- [/test1/request/] o.a.camel.processor.DeadLetterChannel : Failed delivery for (MessageId: ID-**********-local-1531936914834-0-3 on ExchangeId: ID-*********-local-1531936914834-0-4). On delivery attempt: 0 caught: java.lang.IllegalStateException: IOException reading next record: java.io.IOException: (line 4) invalid char between encapsulated token and delimiter
@Bedla, Thank you for your input, I found this working for my UseCase,
onException()
was still sending exchange to
DeadLetterChannel
, so had to use doTry()
CasvFormat
with using
maps
- I couldn't modify csvFormat
in process
, so had to
read header from file and append csv header in body on each split using setBody
Full Route Definition:
CsvDataFormat csvDataFormat = new CsvDataFormat().setUseMaps(true);
from("file:" + rootDir + "/test/")
.log(LoggingLevel.INFO,"Start processing ${file:name}")
.unmarshal().pgp(pgpFileName,pgpUserId,pgpPassword)
.process(exchange -> { /* just to get csv header */
InputStream inputStream = exchange.getIn().getBody(InputStream.class);
try(BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))){
String header = bufferedReader.readLine();
exchange.getIn().setHeader("CSV_HEADER",header);
csvDataFormat.setHeader(header.split(",")); //<- this does not work, so had to add in body below!
System.out.println("csvHeader is : " + header);// + " ? " + Arrays.asList(csvDataFormat.getHeader()));
}
})
.split(body().tokenize("\n")).shareUnitOfWork()
.parallelProcessing().streaming()
.setBody(exchange -> exchange.getIn().getHeader("CSV_HEADER") + "\n" + exchange.getIn().getBody())
.doTry()
.unmarshal(csvDataFormat)
.process(requestFileService)
.doCatch(IOException.class)
//TODO: custom processing here...
.process(exchange -> log.error("caught in dotry: " + exchange.getIn().getBody())).stop()
.end()//end try/catch
.choice()
.when(simple("${property." + Exchange.SPLIT_COMPLETE + "} == true"))
.log(LoggingLevel.INFO, "Finished processing ${file:name}")
.end();