Search code examples
wso2rabbitmqesbwso2-esb

WSO2 ESB: Address endpoint does not resume sending after backends recovery


I am using WSO2 ESB with RabbitMQ, I have two proxy services:

  • AMQPProducerSample, it receives messages via HTTP transport and send it to RabbitMQ queue
  • AMQPProxy it works as consumer for RabbitMQ queue (via rabbitmq transport), consumed messages are send to HTTP endpoint SampleEndPoint

Everything works fine except one scenario:

  1. My backend service set in SampleEndPoint goes down.
  2. New messages arrives and are published via AMQPProducerSample, delivery fails (that is expected because my backend is down). In console I can see:

    WARN - ConnectCallback Connection refused or failed for : mfb.localhost/127.0.0.1:80
    WARN - FaultHandler ERROR_CODE : 101503
    WARN - FaultHandler ERROR_MESSAGE : Error connecting to the back end
    WARN - FaultHandler ERROR_DETAIL : Error connecting to the back end
    WARN - FaultHandler ERROR_EXCEPTION : null
    WARN - FaultHandler FaultHandler : Endpoint [SampleEndPoint]
    
  3. My backend service recovers and is available

  4. New messages arrives and are published via AMQPProducerSample but my endpoint does not receive any message any more. It works fine only if number of undelivered messages is below 5. In that case messages are successfully delivered to backend service. The delivery always stops working after 5 (five) undelivered messages.

How to solve this? Is there any option I need to set or change to make it work all the time?

I am using WSO2 ESB 4.8.1

Below is my full config:

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
    <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
        <parameter name="cachableDuration">15000</parameter>
    </registry>
    <proxy name="AMQPProxy"
           transports="rabbitmq"
           startOnLoad="true"
           trace="enable">
        <description/>
        <target>
            <inSequence>
                <log level="full"/>
                <property name="OUT_ONLY" value="true"/>
                <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
                <send>
                    <endpoint key="SampleEndPoint"/>
                </send>
            </inSequence>
        </target>
        <parameter name="rabbitmq.queue.name">queue</parameter>
        <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
        <parameter name="rabbitmq.exchange.name">exchange</parameter>
        <parameter name="rabbitmq.queue.routing.key">route</parameter>
    </proxy>
    <proxy name="AMQPProducerSample"
           transports="http"
           startOnLoad="true"
           trace="disable">
        <description/>
        <target>
            <inSequence>
                <property name="OUT_ONLY" value="true"/>
                <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
                <property name="NO_KEEPALIVE" value="true" scope="axis2"/>
                <send>
                    <endpoint>
                        <address uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.routing.key=route&amp;rabbitmq.exchange.name=exchange"/>
                    </endpoint>
                </send>
            </inSequence>
            <outSequence>
                <send/>
            </outSequence>
        </target>
    </proxy>
    <endpoint name="SampleEndPoint">
        <address uri="http://mfb.localhost/">
            <timeout>
                <duration>1000</duration>
                <responseAction>discard</responseAction>
            </timeout>
            <suspendOnFailure>
                <errorCodes>-1</errorCodes>
                <progressionFactor>1.0</progressionFactor>
            </suspendOnFailure>
            <markForSuspension>
                <errorCodes>-1</errorCodes>
            </markForSuspension>
        </address>
    </endpoint>
    <sequence name="fault">
        <log level="full">
            <property name="MESSAGE" value="Executing default 'fault' sequence"/>
            <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
            <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
        </log>
        <drop/>
    </sequence>
    <sequence name="main">
        <in>
            <log level="full"/>
            <filter source="get-property('To')" regex="http://localhost:9000.*">
                <send/>
            </filter>
        </in>
        <out>
            <send/>
        </out>
        <description>The main sequence for the message mediation</description>
    </sequence>
</definitions> 

Solution

  • This is a known bug which was there in ESB 4.8.1 fresh pack. Later on we have fixed it. So I recommend you to switch into ESB 4.9.0 release where this issue was fixed.

    [1] https://docs.wso2.com/display/ESB490/Downloading+the+Product