Search code examples
muleamqp

Problems with sync AMQP connector in Mule


I'm trying to process a queue of XML objects in Mule in a durable manner, which have been split from an original xml file, and are then being routed using a "choice" component. Each leg of the choice component leads to an AMQP endpoint with a different queue. The other end of each queue is another Mule flow which is supposed to read the queue, do something to the XML and return it as a reply. All the AMQP endpoints are set as request-response.

The flow seems to work properly up until it puts something onto the AMQP queue, but then just continues on immediately rather than waiting for the message.

This is also born out by the remote flow which seems to read the queue correctly, but then seems to reply immediately before then continuing on to do it's processing correctly. At the end of this it should then reply to the message, but seems not to do so.

Here are some code fragments in case anyone can point out where I'm going wrong...

Main Router Flow

    <amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector"  validateConnections="true"/>
    <flow name="routerFlow">
    <http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" />
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
    <splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/>
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
    <set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" />
    <choice doc:name="Route by Config Item Type">
        <when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']">
            <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/>
            <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
            <logger message="&quot;Back from DirectoryQueue with &quot; + #[payload]" level="INFO" doc:name="Logger"/>
        </when>
        <when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']">
            <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/>
        </when>
    </choice>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/>

Directory Queue Flow

    <flow name="directoryobjectFlow">
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true"   routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP">
        <http:request-builder>
            <http:header headerName="Content-Type" value="application/xml"/>
            <http:header headerName="Accept" value="application/xml"/>
        </http:request-builder>
    </http:request>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/>
    </flow>

and Group Policy Queue (which is currently set to show it does nothing)

    <flow name="amiab-esb-grouppolicyFlow">
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout"  exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true"   routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
    <set-payload doc:name="Set Payload" value="Nothing" />
    <logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/>
    </flow>

I'm pretty new to Mule, and just getting to grips with it so I'd be very grateful for any ideas or insights.

Thanks!


Solution

  • Please use the lastest version of the connector released today 3.6.2. This will execute without problems the flow with request-response endpoints.