Search code examples
muleesbmule4

Reading and parsing large CSV file using 'non-repeatable' stream


Thanks to harshank bansal , Ryan Hoegg and aled for their suggestions and comments to an earlier question here

Using Mule 4.4 community edition and on premise.

So based on comments am reading the large CSV file as a non repeatable stream

File is a CSV file but pipe separated:

101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...

Once it is read am checking for empty file using attributes.size == 0 Now at this point using Set Payload with deferred=true .

In debug mode after this component the payload shows up as java.io.PipedInputStream ( which is fine , this is simply an observation )

Now am passing this payload to the HTTP Request component which calls a REST endpoint . Presently have mocked this REST endpoint and am logging the payload received by this REST endpoint.

However the payload received by this REST endpoint is an empty array

Do I need to set any attribute on HTTP Request ? I can see properties such as Request Streaming mode , do we need to configure anything there ?

Another doubt is : how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?

here is the complete code of what I have tried .

    <flow name="get:employee" >
    
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
            <file:read doc:name="Read emp file"  
                config-ref="File_Config" 
                path="/emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        
        <try doc:name="Try" >
                <choice doc:name="is the file empty ?" >
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " message="Co-relationId  : #[correlationId]  Empty payload from file: #[vars.fileName] !" category="load.empData" />
                <raise-error doc:name="Raise error on empty file "  type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}]' doc:name="Set Payload"  />
            <http:request method="POST" doc:name="Submit products to Rest API"  config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId  : #[correlationId]  Successfully published emp data to XYZ , response received is : #[payload]"/>
            <error-handler >
                        <logger level="ERROR" doc:name="Failure log" message="Co-relationId  : #[correlationId] error encountered after reading  file #[vars.fileName]  , caused by : #[error.detailedDescription]" category="load.empData"/>
                    
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger"  message="Co-relationId  : #[correlationId]  processing emp data END:" category="load.empData"/>
        
    </flow>

Attaching screen print of the flow for easier visualisation: enter image description here

Edit1: Pasting complete code below :

<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
    <logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId  : #[correlationId]  processing emp data START:" category="send.empData"/>
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
        <logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully moving file "/>
        <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
            <file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a" 
                config-ref="File_Config" 
                path="C:\emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        </until-successful>
        <logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully Reading file "/>
        <try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
                <choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId  : #[correlationId]  Empty payload from file!" category="send.empData" />
                <raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId  : #[correlationId]  File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;&#10;  {"clientId": "abcd",&#10;"employees": payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}&#10;}]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
            <http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId  : #[correlationId]  Successfully published product data to XYZ , response received is : #[payload]"/>
            <error-handler >
                    <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
                        <logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId  : #[correlationId] error encountered after reading  file  , caused by : #[error.detailedDescription]" category="send.empData"/>
                    <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
                        <sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
                    </until-successful>
                    <logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId  : #[correlationId]  successfully moved  file  to error folder"/>
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId  : #[correlationId]  processing emp data END:" category="send.empData"/>
        <error-handler >
            <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
                <logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId  : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
            </on-error-continue>
        </error-handler>
    </flow>

Solution

  • That is another downside of using non repeatable stream. It is happening because the breakpoint that you put will consume the InputStream and you can only consume an input stream once. Therefore your set-payload will receive an empty payload when it executes. Having said that, I believe that the debugger should be smart enough to detect that payload is non repeatable stream and not consume it.

    Try disabling the breakpoint, if everything else is set correctly, it should send the request to the HTTP Endpoint. If you want to see the payload you can try logging a few rows after the set-payload once, then remove the logger. (I know it does not sounds appealing, but currently it is the only option I can think off with the current version of Anypoint studio.)

    EDIT: Another reason that you are sending an empty Array is because you are consuming the InputStream while logging the sizeOf(payload). Therefore your set-payload is getting an empty Stream to consume.

    For your question.

    how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?

    This basically uses Transfer-Encoding: chunked. So the request is sent over a single request, but since the payload is not generated before sending the reqeust, it is sent in chunks until the request is finished.