Search code examples
wso2jmsactivemq-classicwso2-esbmessage-queue

WSO2 ESB: Handling Retries and Error Codes in Message Processor with Multiple Message Stores


I'm developing an API using WSO2 ESB where I have configured two message stores and a message processor. In the scenario I'm facing, the message processor retries failed messages up to 5 times, but I want to implement a specific behavior:

  • When the message processor encounters client errors (e.g., 400 HTTP status code series errors) while calling an endpoint through a proxy service, it should move the message directly to the second message store without retrying.
  • For other types of errors, the message processor should retry up to 5 times.

I've attempted to achieve this by filtering based on HTTP status codes, directing 4xx series errors to the second message store. And yes, it is adding to the second message store. However, I'm encountering an issue where the message processor is still retrying these messages and not dequeuing from the first/current queue, it should dequeue successfully since I added the message to the second message store. After adding to the second message store I am dropping the message <drop/>.

Message Processor

<?xml version="1.0" encoding="UTF-8"?>
<messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" messageStore="first-store" name="msg-processor" targetEndpoint="ProxyEndpoint" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="client.retry.interval">5000</parameter>
    <parameter name="member.count">1</parameter>
    <parameter name="message.processor.reply.sequence">reply-sequence</parameter>
    <parameter name="non.retry.status.codes">200</parameter>
    <parameter name="message.processor.deactivate.sequence">deactivate-sequence</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="max.delivery.attempts">5</parameter>
    <parameter name="message.processor.fault.sequence">fault-sequence</parameter>
    <parameter name="store.connection.retry.interval">1000</parameter>
    <parameter name="max.store.connection.attempts">-1</parameter>
    <parameter name="max.delivery.drop">Disabled</parameter>
    <parameter name="interval">10000</parameter>
</messageProcessor>

Proxy Service

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="ProxyService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <log level="full"/>
            <enrich>
                <source clone="true" type="body"/>
                <target property="data" type="property"/>
            </enrich>
            <call>
                <endpoint>
                    <http method="post" statistics="enable" trace="enable" uri-template="http://localhost:3001/transfers"></http>
                </endpoint>
            </call>
            <filter regex="200" source="$axis2:HTTP_SC">
                <then>
                    <log level="custom">
                        <property name="text" value="Endpoint Call Success"/>
                    </log>
                </then>
                <else>
                    <filter regex="4[0-9]{2}" source="get-property('axis2', 'HTTP_SC')">
                        <then>
                            <log level="custom">
                                <property name="text" value="Client Error Occurred while Sending Message"/>
                            </log>
                            <store messageStore="second-store"/>
                            <drop/>
                        </then>
                        <else>
                            <log level="custom">
                                <property name="text" value="Other Error Occurred while Sending Message"/>
                            </log>
                        </else>
                    </filter>
                </else>
            </filter>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

Is this happening because the message acknowledge is not sending? Should I implement acknowledgment mechanisms, if so how can I do that? Do I need to use class mediators or is there another approach I should consider to achieve this behavior effectively? What should I do or what is I am doing wrong here? Thanks in advance.


Solution

  • In the filter that is used to check the 4xx status code, let's set the status code as 200 using the below property after adding the message into the second store(replace the drop with the below property). Then the proxy will send 200 response back to the processor and the message will be acked and removed from the first store.

    <property name="HTTP_SC" value="200" scope="axis2"/>