Search code examples
muledataweavemulesoftanypoint-studio

Creating a batch job in mule that takes a number of objects


I would like create batch job that takes only 3 batch block size (but in the beggining I have 9 or 13 or 15 objects every call could be different) in one time and then create the response and then go back to the batch job because I have more then 3. And I would like after batch job to have my response not the response from the batch job (I mean this default. How I can do that?)

My code looks like this:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns:http="http://www.mulesoft.org/schema/mule/http"
    xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd">
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="bba7ca43-7816-49a7-b6f9-964da023ca9e" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>
    <flow name="batchjobFlow" doc:id="c48e4b0d-9721-426d-805f-0d796703a398" >
        <http:listener doc:name="Listener" doc:id="d4a038c9-7a2d-4354-9e39-03c97bca6fb3" config-ref="HTTP_Listener_config" path="/batchJob"/>
        <ee:transform doc:name="Payload" doc:id="6a45eb4c-b3ba-4200-8d9e-cfbc40312132" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
[
  {
    "examplePerson": {
      "personOne": 700000,
      "PersonTwo": 8000,
      "adress": false
    },
    "Id": "aQ",
    "Name": "Women",
    "status": "Fail"
  },
  {
    "examplePerson": {
      "personOne": 700000,
      "PersonTwo": 8000,
      "adress": false
    },
    "Id": "a0MAQ",
    "Name": "Old",
    "status": "SUCCESS"
  },
   {
    "examplePerson": {
      "personOne": 700000,
      "PersonTwo": 8000,
      "adress": false
    },
    "Id": "a000000EAQ",
    "Name": "Old",
    "status": "SUCCESS"
  }
]]]></ee:set-payload>
            </ee:message>
            <ee:variables >
                <ee:set-variable variableName="chunkNumber" ><![CDATA[%dw 2.0
output application/json
---
1]]></ee:set-variable>
            </ee:variables>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" doc:id="f4b913ba-e38c-46d1-8f33-9ffd27277687" message="#[payload]" category="before"/>
        <batch:job jobName="limitChunk" doc:id="81e4b135-97eb-448d-864f-85b52d72feb9" blockSize="2">
            <batch:process-records >
                <batch:step name="Batch_Step" doc:id="5dbe5bb5-0c9a-44bf-8693-54156b1ba112">
                    <logger level="INFO" doc:name="Logger" doc:id="29e0e433-4543-48c8-9fa2-65ebeef32759" message="#[payload]" category="bnnn" />
                    <ee:transform doc:name="Response" doc:id="58978942-b610-43c4-8932-a8116c3a9e70">
                        <ee:message>
                            <ee:set-payload><![CDATA[%dw 2.0
import * from dw::core::Strings
import toBase64 from dw::core::Binaries

var success = payload filterObject ((value, index) -> value."status" == "SUCCESS")
var failed = payload filterObject ((value, index) -> value."status" == "FAIL")
var array = [payload]
output application/json
---
{
    "Names": ["Message"],
    "Demo": {
        "Message": {
            "type": "Message"
        },
            (if ( (50 as Number * 1 as Number) >= sizeOf(payload default [] ) ){
            "One": "true",
        }
               else 
                {
            "One": "false"
        }),

        
    },
    "objectTest": flatten ([((array map (value,index) -> {
        "Demo": {
            "Message": "C"
        },
        Id: value.Id,
        (examplePerson: value.examplePerson.personOne) if(value.examplePerson.personOne != null),
        Name: value.Name,
        Status: value.Status    
    }))]),

}]]></ee:set-payload>
                        </ee:message>
                    </ee:transform>
                </batch:step>
            </batch:process-records>
        </batch:job>
        <logger level="INFO" doc:name="Logger" doc:id="2e3f0e66-a334-406a-9942-a9ded2326cd8" message="#[payload]" category="after batchJob"/>
        <ee:transform doc:name="Transform Message" doc:id="c7c635b9-59fb-4de6-9ba2-b2b4a4b34b21" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <choice doc:name="Choice" doc:id="f77304fc-e4cb-4516-b7e7-d8a5d32f172f" >
            <when expression='#[(vars.chunkNumber &gt; 0)]'>
                <flow-ref doc:name="batchjobFlow" doc:id="8b947915-9f8b-48d0-b1c8-c87760337194" name="batchjobFlow" />
            </when>
            <otherwise >
                <logger level="INFO" doc:name="Logger" doc:id="8fbee201-5bba-4b59-9e07-51646fdf5c47" message='#["END"]' category="END"/>
                <ee:transform doc:name="Response" doc:id="b175bfc8-8d1f-4f38-a44b-f5b85d568722" >
                    <ee:message >
                        <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
                    </ee:message>
                </ee:transform>
            </otherwise>
        </choice>
    </flow>
</mule>

Solution

  • If you want groups of 3 to be processed one by one, and you want the changed payloads to be available afterward, consider using dataweave dw::core::Arrays::divideBy and parallel-for-each with maxConcurrency=1.

    We are not using parallel-for-each for its parallel processing, but instead for its ability to provide a composite payload result of all the processed items after completion.

    First, use a transform-message to create a payload using divideBy from dataweave to make an array of 3-item arrays, then use parallel-for-each with maxConcurrency="1" to process all the 3 item arrays one by one.