Search code examples
jdbcmulemule-studiomule-el

How to fetch particular numbers of rows from DB in a time interval using Mule


I have a Mule flow and I have a requirement of fetching rows from Database and write into a file.Now I have 100 rows in Database and I need to fetch 5 rows from DB at a time and write into a file and again after few interval of time says 30 seconds fetch another 5 rows and write the payload into file .. Now my flow is as follows :-

 <spring:beans>
        <spring:bean id="DB_Source" name="DB_Source" class="org.enhydra.jdbc.standard.StandardDataSource">
            <spring:property name="url" value="${url}"/>
            <spring:property name="driverName" value="${driverName}"/>
        </spring:bean>
     </spring:beans>
    <jdbc-ee:connector name="Database_Global" dataSource-ref="DB_Source" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" transactionPerMessage="true">
        <!-- Here transactionPerMessage="false" so that it retrieve and display all the row at once-->
         <jdbc-ee:query key="RetriveQuery" value="select * from getData"/>  <!-- or we can use CALL sp_retrieveData(@Id=13) -->
    </jdbc-ee:connector>
    <context:property-placeholder location="classpath:conf/DBConnectionProp.properties"/>



    <flow name="InboundJDBC" doc:name="InboundJDBC" initialState="started">
        <jdbc-ee:inbound-endpoint  queryTimeout="-1" pollingFrequency="1000" doc:name="Database"   connector-ref="Database_Global" queryKey="RetriveQuery">

         <jdbc-ee:transaction action="ALWAYS_BEGIN" />

        <!--  <property key="receiveMessageInTransaction" value="true"/> --><!-- This to receive all the row in once -->
        </jdbc-ee:inbound-endpoint>
        <mulexml:object-to-xml-transformer doc:name="Object to XML"/>

      <message-properties-transformer doc:name="Message Properties"> 
      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="5"/> <!-- Set the number of rows to be return at a time -->
      <add-message-property key="MULE_CORRELATION_ID" value="1"/> 
      </message-properties-transformer> 
      <collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Collection Aggregator"/>

        <logger message="JDBC Transaction #[message.payload] **************" level="INFO" doc:name="Logger"/>
        <file:outbound-endpoint path="E:\backup\test\ss" outputPattern="#[java.util.UUID.randomUUID().toString()].txt" responseTimeout="10000" doc:name="File"/>

    </flow>  
</mule>

Now the issue is, when the application starts,it is fetching only 5 rows from the DB out of 100 rows and write into a file and then remaining rows are not fetched and no new files are created... But I want to fetch 5 rows after every 30 sec and write it into a new file at the end .. Am I doing anything wrong ?? I have taken the following as reference :- How do I get a Mule to return multiple rows from a JDBC query as a single transaction?

Updated flow :-

<flow name="InboundJDBC" doc:name="InboundJDBC" initialState="started">
        <jdbc-ee:inbound-endpoint  queryTimeout="-1" pollingFrequency="1000" doc:name="Database"   connector-ref="Database_Global" queryKey="RetriveQuery">

         <jdbc-ee:transaction action="ALWAYS_BEGIN" />

     <!--  <property key="receiveMessageInTransaction" value="true"/> --><!-- This to receive all the row in once -->
        </jdbc-ee:inbound-endpoint>
        <set-property propertyName="#[message.inboundProperties['requestId']]" value="#[java.util.UUID.randomUUID().toString()]" doc:name="Property"/>

        <mulexml:object-to-xml-transformer doc:name="Object to XML"/>

      <message-properties-transformer doc:name="Message Properties"> 

      <add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="5"/> <!-- Set the number of rows to be return at a time -->
      <add-message-property key="MULE_CORRELATION_ID" value="#[message.inboundProperties['requestId']]"/> 
      </message-properties-transformer> 
      <collection-aggregator timeout="5000" failOnTimeout="false" doc:name="Collection Aggregator"/>

        <logger message="JDBC Transaction #[message.payload] **************" level="INFO" doc:name="Logger"/>
        <file:outbound-endpoint path="E:\backup\test\ss" outputPattern="#[java.util.UUID.randomUUID().toString()].txt" responseTimeout="10000" doc:name="File"/>

    </flow>

Now it's creating files per row ...


Solution

  • I see several issues:

    • The query selects all rows instead of only 5,
    • There's no update query for marking the selected records so the same records will be picked up again and again,
    • The correlation ID is fixed with: <add-message-property key="MULE_CORRELATION_ID" value="1"/>. Since the correlation ID is fixed, the collection-aggregator will aggregate 5 messages for this ID and will stop there. Any new message coming for this ID will be discarded. Instead use an MEL expression that produces the same value for the five rows: what expression to use is up to you, it can for example be some sort of modulo of time that provides a constant value for a 30 seconds time window...