Search code examples
databasemulemule-studiomule-componentanypoint-studio

To filter out the right records from a CSV file and hence to perform the required operation using Anypoint Studio


I am working on a flow that is supposed to take a CSV file from the system and insert the contained data into a database. Some of the records in the file have the wrong format required(Eg: wrong number of columns) and hence must me written into another file of text type for analysis.

I have created a flow that inserts all the good records into the database but it does not input the bad records into a file. I am currently a beginner and hence am not sure how to proceed with it.

The XML code:

 <flow name="fileFlow">
    <file:inbound-endpoint path="src/main/resources/Input" moveToPattern="#[message.inboundProperties.originalFilename].zip" moveToDirectory="src/main/resources/Output" responseTimeout="10000" metadata:id="b85f6b05-1679-4b60-8bbe-30e6d2c68df7" doc:name="File">
        <file:filename-regex-filter pattern=".*csv" caseSensitive="true"/>
    </file:inbound-endpoint>
    <file:file-to-string-transformer doc:name="File to String"/>
    <set-payload value="#[payload.replaceAll(&quot;,,&quot;, &quot;, ,&quot;)]" doc:name="Set Payload"/>
    <splitter expression="#[rows=StringUtils.split(message.payload,'\r\n');ArrayUtils.subarray(rows,1,rows.size())]" doc:name="Splitter"/>
    <flow-ref name="fileFlow1" doc:name="fileFlow1"/>
    <catch-exception-strategy doc:name="Insert the bad record into a file">
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
        <set-session-variable variableName="var" value="#[payload+'var']" doc:name="Session Variable"/>
        <file:outbound-endpoint path="src/main/resources/Output" outputPattern="BadRecords.txt" responseTimeout="10000" doc:name="File"/>
        <flow-ref name="fileFlow1" doc:name="fileFlow1"/>
    </catch-exception-strategy>
</flow>
<flow name="fileFlow1">
    <expression-transformer expression="#[StringUtils.split(message.payload,',')]" doc:name="Expression"/>
    <db:insert config-ref="MySQL_Configuration" doc:name="Database">
        <db:parameterized-query><![CDATA[insert into GoodRecords values(#[message.payload[0]], #[message.payload[1]], #[message.payload[2]], #[message.payload[3]], #[message.payload[4]], #[message.payload[5]], #[message.payload[6]], #[message.payload[7]], #[message.payload[8]], #[message.payload[9]], #[message.payload[10]], #[message.payload[11]], #[message.payload[12]], #[message.payload[13]], #[message.payload[14]], #[message.payload[15]], #[message.payload[16]], #[message.payload[17]], #[message.payload[18]], #[message.payload[19]], #[message.payload[20]])]]></db:parameterized-query>
    </db:insert>
    <logger message="#[payload] " level="INFO" doc:name="Logger"/>
</flow>

The flow structure:

Flow diagram

Personally I think the flow I've created is very inefficient and wrong.

How do I enter the bad records into the file(if the given flow is right)? I wanted to use bulk mode for the given use case (as there are around 1000 odd records to work with) but am not sure how to proceed with that as well.


Solution

  • In your code you have used a private flow for inserting your record to database. So any exception occurred during insertion will not be catch by the parent flow exception strategy, you can have separate exception strategy for your private flow or you may use sub-flow. There is another clean solution for that you can use batch processing to process these records and create a separate step to handle all failure records.