Search code examples
javaspringspring-integrationproducer-consumeropencsv

Processing Huge CSV File using Producer - Consumer Pattern


I am trying to process an arbitary CSV File which can range from 10 records to million records. The CSV File has 4 fixed columns (e.g. a,b,c,d) and 2 additional columns (e,f) which come from external REST API.

My goal is to read all the records from CSV and for each record, call the external REST API to bring 2 additional columns and output the resultant CSV as merged CSV. The output should be same csv file with columns (a,b,c,d,e,f).

I implemented this scenario using Content Enricher pattern from EIP using Spring Integration and was able to achieve the expected output, however this solution works well for low number of records as I read the CSV file sequentially, but as soon as the no. of records increases, the time to execute the program increases too in O(n) fashion.

I further started implementing Producer - Consumer design pattern and tried to implement the code in such a way that Each record read from CSV is then put into a Queue using put() and then multiple Consumers read from the same shared Queue using take() method of BlockingQueue. The Main program instantiates ExecutorService with 1 Producer and multiple consumers by using Executors.newFixedSizedThreadPool(3), however I am facing couple challenges:

  1. The take() method never exits out. I tried implementing Poison Pill by adding a terminator object and then checking for the same poison pill in Consumer loop to break out, but it still never breaks out (I added a system out in the loop to see if it ever reaches Poison Pill and it does print my statement out), so why does it not exit out?

  2. The CSV file only keeps the data read from the last executed Consumer thread and overwrites everything writing from other consumers- I am using OpenCSV to read / write CSV Data.

Here is the code I have uptil now. Can someone please guide me where I am wrong and areas to improve in this code?

Main Program

**

BlockingQueue<Account> queue = new ArrayBlockingQueue<>(100);
    AccountProducer readingThread = new AccountProducer(inputFileName, queue);
    //new Thread(readingThread).start();
    ExecutorService producerExecutor = Executors.newFixedThreadPool(1);
    producerExecutor.submit(readingThread);

    AccountConsumer normalizers = new AccountConsumer(outputFileName, queue, accountService );
    ExecutorService consumerExecutor = Executors.newFixedThreadPool(3);

    for (int i = 1; i <= 3; i++) {
        consumerExecutor.submit(normalizers);
    }
    producerExecutor.shutdown();
    consumerExecutor.shutdown();

AccountProducer

public class AccountProducer implements Runnable {
private String inputFileName;
private BlockingQueue<Account> blockingQueue;
private static final String TERMINATOR = "TERMINATOR";

public AccountProducer (String inputFileName, BlockingQueue<Account> blockingQueue) {

    this.inputFileName = inputFileName;
    this.blockingQueue = blockingQueue;
}


@Override
public void run() {

    try (Reader reader = Files.newBufferedReader(Paths.get(inputFileName));) {

        PropertyEditorManager.registerEditor(java.util.Date.class, DateEditor.class);
        ColumnPositionMappingStrategy<Account> strategy = new ColumnPositionMappingStrategy<>();
        strategy.setType(Account.class);
        String[] memberFieldsToBindTo = { "accountId", "accountName", "firstName", "createdOn" };
        strategy.setColumnMapping(memberFieldsToBindTo);

        CsvToBean<Account> csvToBean = new CsvToBeanBuilder<Account>(reader).withMappingStrategy(strategy)
                .withSkipLines(1).withIgnoreLeadingWhiteSpace(true).build();

        Iterator<Account> csvAccountIterator = csvToBean.iterator();

        while (csvAccountIterator.hasNext()) {
            Account account = csvAccountIterator.next();    
            // Checking if the Account Id in the csv is blank / null - If so, we skip the
            // row for processing and hence avoiding API call..
            if (null == account.getAccountId() || account.getAccountId().isEmpty()) {
                continue;
            } else {
                // This call will send the extracted Account Object down the Enricher to get
                // additional details from API
                blockingQueue.put(account);
            }
        }
    } catch (InterruptedException | IOException ex) {
        System.out.println(ex);
    } finally {
        while (true) {
            try {
                Account account = new Account();
                account.setAccountId(TERMINATOR);
                blockingQueue.put(account);
                break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
}

AccountConsumer

public class AccountConsumer implements Runnable {

private String outputFileLocation;
private BlockingQueue<Account> blockingQueue;
private AccountService accountService;

public AccountConsumer(String outputFileLocation, BlockingQueue<Account> blockingQueue, AccountService accountService) {
    this.blockingQueue = blockingQueue;
    this.outputFileLocation = outputFileLocation;
    this.accountService = accountService;
}

@Override
public void run() {
    List<Account> accounts = new ArrayList<>();

    try {
        while (true) {
            Account account = blockingQueue.poll();
            account = accountService.populateAccount(account);
            accounts.add(account);
        }

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } catch (Exception ex) {
        System.out.println(ex);
    }
    processOutput(accounts, outputFileLocation);
}

/**
 * The method processOutput simply takes the list of Accounts and writes them to
 * CSV.
 * 
 * @param outputFileName
 * @param accounts
 * @throws Exception
 */
private void processOutput(List<Account> accounts, String outputFileName) {

    System.out.println("List Size is : " + accounts.size());
    // Using try with Resources block to make sure resources are released properly
    try (Writer writer = new FileWriter(outputFileName, true);) {
        StatefulBeanToCsv<Account> beanToCsv = new StatefulBeanToCsvBuilder(writer).build();
        beanToCsv.write(accounts);
    } catch (CsvDataTypeMismatchException | CsvRequiredFieldEmptyException ex) {
        System.out.println(ex);
        //logger.error("Unable to write the output CSV File : " + ex);
        //throw ex;
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

Here is the Spring Integration XML I am using:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task 
http://www.springframework.org/schema/task/spring-task.xsd
    http://www.springframework.org/schema/integration 
http://www.springframework.org/schema/integration/spring-integration.xsd"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:beans="http://www.springframework.org/schema/beans" 
xmlns:task="http://www.springframework.org/schema/task">

<channel id="accountChannel" /> 
<!-- accountOutputChannel is used for putting the Account object on the 
Channel which will then be consumed by accountAPIChannel as Input to the API 
-->
<channel id="accountOutputChannel" />
<!-- accountAPIChannel will take 1 accountId at a time and invoke REST API 
Service to get additional details required to fill the Content Enricher -->
<channel id="accountAPIChannel" />

<!-- accountGateway is the entry point to the utility -->
<gateway id="accountGateway" default-request-timeout="5000"
    default-reply-timeout="5000"
    service-interface="com.epe.service.AccountService"
    default-request-channel="accountChannel">
</gateway>

<!--Content  enricher is used here for enriching an existing message with 
additional data from External API
     This is based on EIP Pattern - Content Enricher -->
<enricher id="enricher" input-channel="accountOutputChannel"
    request-channel="accountAPIChannel">
    <property name="status" expression="payload.status" />
    <property name="statusSetOn" expression="payload.createdOn" />
</enricher>

<beans:bean id="accountService"
    class="com.epe.service.impl.AccountServiceImpl" />

<!-- Below service-activator is used to actually invoke the external REST 
API which will provide the additional fields for enrichment -->
<service-activator id="fetchAdditionalAccountInfoServiceActivator"
    ref="accountInfoService" method="getAdditionalAccountInfoService" 
input-channel="accountAPIChannel"
    />

<!-- accountInfoService is a bean which will be used for fetching 
additional information from REST API Service -->
<beans:bean id="accountInfoService"
    class="com.epe.service.impl.AccountInfoServiceImpl" />

</beans:beans>

Solution

  • You are using poll() in the code, not take().

    You should use poll() with a timeout instead e.g. poll(10, TimeUnit.SECONDS) so you can terminate each thread gracefully.

    But, you don't need all this logic; you can achieve all of this using Spring integration components - ExecutorChannel and file outbound channel adapter in append mode.

    EDIT

    I don't have time to write your entire application, but essentially you need...

    <file:inbound-channel-adapter />
    <file:splitter output-channel="execChannel"/>
    <int:channel id="execChannel">
        <int:dispatcher task-executor="exec" />
    </int:channel>
    <int:transformer /> <!-- OpenCSV -->
    <int:enricher ... />
    <int:transformer /> <!-- OpenCSV -->
    <int:resequencer /> <!== restore order -->
    <file:outbound-channel-adapter />
    

    You can read about all these components in the reference manual.

    You also might want to consider using the Java DSL instead of xml; something like...

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(File.inboundChannelAdapter(...))
                  .split(Files.splitter())
                  .channel(MessageChannels.executor(exec())
                  .transform(...)
                  .enrich(...)
                  .transform(...)
                  .resequence()
                  .handle(File.outboundCHannelAdapter(...))
                  .get();