Search code examples
csvparsingdistributed-systemunivocity

Univocity bean processor showing inconsistent behaviour in distributed system


I am using univocity bean processor for file parsing. I was able to successfully use it on my local box. But on deploying the same code on an environment with multiple hosts, the parser is showing inconsistent behavior. Say for invalid files, it is not failing processing and also for valid files it fails processing some times.

Would like to know if bean processor implementation suitable for a multi-threaded distributed environment.

Sample code:

private void validateFile(@Nonnull final File inputFile) throws NonRetriableException {

    try {
        final BeanProcessor<TargetingInputBean> rowProcessor = new BeanProcessor<TargetingInputBean>(
                TargetingInputBean.class) {

            @Override
            public void beanProcessed(@Nonnull final TargetingInputBean targetingInputBean,
                    @Nonnull final ParsingContext context) {

                final String customerId = targetingInputBean.getCustomerId();
                final String segmentId = targetingInputBean.getSegmentId();
                log.debug("Validating customerId {} segmentId {}  for {} file", customerId, segmentId, inputFile
                        .getAbsolutePath());
                if (StringUtils.isBlank(customerId) || StringUtils.isBlank(segmentId)) {
                    throw new DataProcessingException("customerId or segmentId is blank");
                }

                try {
                    someValidation(customerId);
                } catch (IllegalArgumentException ex) {
                    throw new DataProcessingException(
                            String.format("customerId %s is not in required format. Exception"
                                    + " message %s", customerId, ex.getMessage()),
                            ex);
                }

            }
        };

        rowProcessor.setStrictHeaderValidationEnabled(true);

        final CsvParser parser = new CsvParser(getCSVParserSettings(rowProcessor));
        parser.parse(inputFile);
    } catch (TextParsingException ex) {
        throw new NonRetriableException(
                String.format("Exception=%s occurred while getting & parsing targeting file "
                        + "contents, error=%s", ex.getClass(), ex.getMessage()),
                ex);
    }

}

private CsvParserSettings getCSVParserSettings(@Nonnull final BeanProcessor<TargetingInputBean> rowProcessor) {

    final CsvParserSettings parserSettings = new CsvParserSettings();
    parserSettings.setProcessor(rowProcessor);
    parserSettings.setHeaderExtractionEnabled(true);
    parserSettings.getFormat().setDelimiter(AIRCubeTargetingFileConstants.FILE_SEPARATOR);
    return parserSettings;
}

TargetingInputBean:

public class TargetingInputBean {

@Parsed(field = "CustomerId")
private String customerId;

@Parsed(field = "SegmentId")
private String segmentId;
}

Solution

  • Are you using the latest version?

    I just realized you are probably affected by a bug introduced in version 2.5.0 that was fixed in version 2.5.6 if I'm not mistaken. This plagued me for a while as it was an internal concurrency issue that was hard to track down. Basically when you pass a File without an explicit encoding it will try to find a UTF BOM marker in the input (effectively consuming the first character) to determine the encoding automatically. This happened only for InputStreams and Files.

    Anyway, this has been fixed so simply updating to the latest version should get rid of the problem for you (please let me know if you are not using version 2.5.something)

    If you want to remain with the current version you have there, the error will be gone if you call

    parser.parse(inputFile, Charset.defaultCharset());
    

    This will prevent the parser from trying to discover whether there's a BOM marker in your file, therefore avoiding that pesky bug.

    Hope this helps