I have a Mule 3.3.0 flow which splits a file into records. I need to execute an action (stored procedure) AFTER ALL records have finished processing. The problem is that sometimes the action gets executed before all records have been processed by Mule. I think this is due to the fact that Mule process stuff in parallel, which is great, so sometimes the final action gets called too early. If I set the flow as synchronous things appear to work, but I'm not taking advantage of parallel execution. I think I could also use a Foreach scope (haven't tried) but I guess that stuff will still not be parallelized. Is there a way to "wait" until all records finish processing?
I'm attaching a very simple flow which exhibits this behaviour. If you run it you will see that the loggers don't print stuff in order. Actually, the "DONE" message gets logged before the rest. The flow processes a simple csv file auntil it matches a field with value "end". There is a choice component which loggs "DONE" when such field is found. The rest of the fields simply get logged.
Any help will be greatly appreciated.
Flow:
Flow xml
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd ">
<file:connector name="inputFileConnector" autoDelete="true"
streaming="false" validateConnections="true" doc:name="File" fileAge="60000"
readFromDirectory="#{systemProperties['user.home']}" />
<flow name="flow1" doc:name="flow1" processingStrategy="synchronous">
<file:inbound-endpoint path="#{systemProperties['user.home']}"
responseTimeout="10000" doc:name="Input File" fileAge="100"
connector-ref="inputFileConnector">
<file:filename-regex-filter pattern="input.csv"
caseSensitive="false" />
</file:inbound-endpoint>
<byte-array-to-string-transformer
doc:name="Byte Array to String" />
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[return payload.split('\n');]]></scripting:text>
</scripting:script>
</scripting:component>
<collection-splitter doc:name="Collection Splitter" />
<choice doc:name="Choice">
<when expression="#[groovy:payload != 'end']">
<processor-chain>
<logger message="." level="INFO" doc:name="Process"/>
<vm:outbound-endpoint path="toFlow2" doc:name="VM"/>
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger message="|||| DONE" level="INFO" doc:name="DONE"/>
</processor-chain>
</otherwise>
</choice>
</flow>
<flow name="flow2" doc:name="flow2" >
<vm:inbound-endpoint path="toFlow2" doc:name="VM"/>
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[return payload.split(',');]]></scripting:text>
</scripting:script>
</scripting:component>
<collection-splitter doc:name="Collection Splitter" />
<logger message="|||||| #[payload]" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint path="toFlow3" doc:name="VM"/>
</flow>
One option is to use a collection-aggregator
to act as an accumulator, blocking the final flow action until all the messages have been processed. The trick is that the collection-splitter
s will set up a correlation group size that is only good for either the number of lines in the file or the number of columns in the file. But we want to accumulate until all columns of all lines have been processed. The solution consists in computing first this value (ie total number of expected messages) and overriding whatever correlation group size had been calculated the collection-splitter
s with the total value.
Here is how I've done this (you'll note that I replaced all Groovy snippets with more Mule-3-esque MEL expressions):
<file:connector name="inputFileConnector" autoDelete="true"
streaming="false" validateConnections="true" fileAge="60000"
readFromDirectory="#{systemProperties['user.home']}" />
<flow name="flow1" processingStrategy="synchronous">
<file:inbound-endpoint path="#{systemProperties['user.home']}"
responseTimeout="10000" fileAge="100"
connector-ref="inputFileConnector">
<file:filename-regex-filter pattern="input.csv"
caseSensitive="false" />
</file:inbound-endpoint>
<byte-array-to-string-transformer />
<set-session-variable variableName="expectedMessageCount"
value="#[org.mule.util.StringUtils.countMatches(message.payload, '\n') + org.mule.util.StringUtils.countMatches(message.payload, ',') - 1]" />
<expression-transformer expression="#[message.payload.split('\n')]" />
<collection-splitter enableCorrelation="IF_NOT_SET" />
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
value="#[sessionVars.expectedMessageCount]" />
<choice>
<when expression="#[message.payload != 'end']">
<processor-chain>
<logger message="." level="INFO" />
<vm:outbound-endpoint path="toFlow2" />
</processor-chain>
</when>
<otherwise>
<processor-chain>
<logger message="|||| END" level="INFO" />
</processor-chain>
</otherwise>
</choice>
</flow>
<flow name="flow2">
<vm:inbound-endpoint path="toFlow2"/>
<expression-transformer expression="#[message.payload.split(',')]" />
<collection-splitter />
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
value="#[sessionVars.expectedMessageCount]" />
<logger message="|||||| #[message.payload]" level="INFO"/>
<vm:outbound-endpoint path="toFinalizer" />
<vm:outbound-endpoint path="toFlow3" />
</flow>
<flow name="finalizer">
<vm:inbound-endpoint path="toFinalizer" />
<collection-aggregator />
<logger message="|||| DONE" level="INFO" />
</flow>
NB. Alternatively, if using a collection-aggregator
is an issue because it uses too much memory, you could use an expression component to decrement sessionVars.expectedMessageCount
and filter to let a message hit the final message processor when the counter is back to 0.