Search code examples
muleesbmule4

Memory issues : parse a large csv file transform to json and then call a third party rest service using Mule 4.4 community edition


Using Mule 4.4 community edition - on premise I have a large csv file containing list of employees :

101,John Saunders,19,M,Physics,Chemistry,Mechanics
102,Jim White,17,M,Languages,Art,Pottery
...
...

The file size is 700MB

This file will be dropped at a SFTP location for Mule 4.4 Community edition to read. NOTE - I need to parse and transform the data into JSON and then call a third party rest service I only need to pick up student details if student has 'Physics' in any of their subjects So the transformed JSON would look like this :

[{"id":"101","name": "John Saunders","age": "19" , "gender": "M"},{...}]

Now the challenge I am facing is - after reading the file I am doing a check to ensure that file is NOT empty or null by doing a check as :

sizeOf(payload) == 0

with a file size of greater than 1 MB itself code is failing in anypoint studio with following error :

Internal execution exception while executing the script, this is most probably a bug, file an issue with the script and the input data. NOTE: Enable dump option using -Dcom.mulesoft.dw.dump_files=true to log all required information. Caused by: org.mule.runtime.api.streaming.exception.StreamingBufferSizeExceededException: Buffer has exceeded its maximum size of 1048576 evaluating expression: "sizeOf(payload) == 0

I can try increasing the available heap memory ( though at this moment I dont know exactly how ) but my question is this : I can think of following options :

#1 Use Batch processing ( not an option since Batch not available in Community edition of mule )

#2 Increase heap memory ( dont want to go this way as if file sizes increase further , this solution will not scale )

#3 Breakdown large file into smaller files and then process

#4 Wanted to learn as well as explore any options of streaming / chunking in mule 4

Option #4 is what I would like to explore. Was searching on internet and found one article here

So while reading the file from sftp - I cannot use 'Repeatable file-store stream' but can use 'Repeatable in memory stream'

<sftp:read doc:name="Read Empfile" config-ref="SFTP_Config" path="\working\emp.csv" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
            <repeatable-in-memory-stream />
    </sftp:read>

will this not read entire file into memory ? or will it only read part of the file ?

Here is the entire code I am trying :

<flow name="load_emp" doc:id="81a7b13e-1d38-4722-9015-dd03e765023c" >
<sftp:read doc:name="Read emp file" doc:id="b8662f19-f442-4150-b74d-8339fedee72b" config-ref="SFTP_Config" path="\working\emp.unl" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
            <repeatable-in-memory-stream />
    </sftp:read>
    <try doc:name="Try" doc:id="2a0c2d4c-23db-4d49-aa36-634720bb45b8" >
        <choice doc:name="Choice" doc:id="e2b67582-f327-4974-9fb4-875af7962e6e" >
            <when expression="sizeOf(payload) == 0">
                <raise-error doc:name="Raise error" doc:id="290c67ca-4ae6-47f5-8d63-bd54cf122604" type="ANY" description="File is empty"/>
            </when>
            <otherwise >
                <logger level="INFO" doc:name="not empty" doc:id="a68045a7-ba59-4c7a-915e-2145de5c3b4b" />
            </otherwise>
        </choice>
    </try>
    <set-payload value='#[output application/json
&#10;
&#10;
&#10;
&#10;---
&#10;payload map (value,index)-&gt;{
&#10;    "id": value.column_0,
&#10;    "name": value.column_1
&#10;}]' doc:name="Set Payload" doc:id="17277532-f496-4a61-8d9f-136deaeb8104" />
        <http:request method="POST" doc:name="Request" doc:id="3d1982e2-9740-4866-a08e-a29b4e1596c0" config-ref="HTTP_Request_configuration1" path="/load"/>
    
</flow>

Here is the schematic to give a visual representation : enter image description here

So : I am confused how streaming will actually work in this flow ? Do I need to use a for each as per the linked article ? so lets say I have 100,000 records in file and I say in for loop a batch size of 1000 does this mean that only 1000 records are loaded into memory and sent to the REST API ?

if yes does this mean that I will have to make numerous calls to rest endpoint ? ( 100 times ? )

I just need a simple explanation of way to go and what would be the behaviour please ? I have read numerous articles about streaming but the penny does not drop !


Solution

  • As mentioned in the previous answer, you should be using attributes.size for getting the file size, instead of re calculating it with sizeOf(). That will let your flow pass the choice router atleast, however it will fail whenever you start to map your csv to json in the set-payload. With the current configuration that you shared, increasing the head size will not help you will have to increase the Max Buffer Size parameter of the repeatable-in-memory-stream. But there is an alternative.

    You need to pick the correct streaming strategy first.

    1. non-repeatable-stream If the server that you are sending the data to can handle chunked request, this is probably the best option for you, since you do not want to increase the memory of your app, and are looking for a solution to read data in chunks. This will not load the data in memory unless you make it to do. It will basically make the streaming payload more like java's traditional InputStream in the sense that You can only consume the payload once. Therefore, before going forward with this you need to make sure you do not need the payload more then once. This means that if you use your payload to log the number of rows like this sizeOf(payload), it will consume the paylaod you will have empty payload after that. You will need to make the following changes.

      • Select the streaming mode to non-repeatable-stream .
      • Change the choice to use attributes.size == 0 instead of sizeOf
      • Update map to generate deferred output. Don't get confused by the term if you have not heard it before. It is basically streaming the output. Before using this do refer this documentation. If you do not use it, it will again load the output of this map function in memory.
    output application/json deferred=true
    ---
    payload map (value,index)->{
        "id": value.column_0,
        "name": value.column_1
    }
    

    With this you can process this 700MB file even if you limit your heap to 512 MB (you should not but you can).

    Note: After this the HTTP Request component will send chunked data. You may also want to confirm if the server that you are sending the JSON to is able to handle it correctly.

    1. repeatable-in-memory-stream: If you use this, AFAIK, you will have to increase the Max Buffer Size parameter like this as your payload is quite large.

    enter image description here Currently you have set it as 1Mb therefore it is failing for you with 1Mb data. You will have to experiment what values works best for you. This streaming strategy needs to load the full payload in memory when you consume the payload (in order to make it repeatable). It is also possible that you will have to increase the heap size if you have to increase the Max Buffer Size by a lot. You can also increase max memory with this parameter: -Dmule.max.streaming.memory=1024000000