Search code examples
mulemule-componentmule-esb

Mule FailedToQueueEventException Quartz connection


I have below quartz inbound component to trigger an kafka event. But it seems to be throwing SEDA queue Exception.

<quartz:connector name="myQuartzConnector" validateConnections="true">
    <receiver-threading-profile maxThreadsActive="1"/>
</quartz:connector>

<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
    <quartz:inbound-endpoint jobName="Trigger-Kafka-Consumer-Quartz-Job" repeatInterval="1" responseTimeout="10000" connector-ref="myQuartzConnector" doc:name="Quartz">
        <quartz:event-generator-job/>
    </quartz:inbound-endpoint>
    <component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>

Quartz is used to trigger the Kafka consumer flow. The control does not return back to the scheduler till the Kafka consumer connection ends in the java component. Kafka consumer connection will never end as it is is in recursive while(true) loop. By chance if Kafka connections ends, the quartz scheduler should retrigger the java component which reopens the kafka connection.

Message               : The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
Payload               : {NullPayload}
Payload Type          : org.mule.transport.NullPayload
Element               : null @ message-gateway-profile-update-api:null:null
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.api.service.FailedToQueueEventException: The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
    at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:139)
    at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:102)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:103)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)

Flow snapshot


Solution

  • The error is caused because the Quartz endpoint is set to trigger faster than the flow can process the messages. This flow has a default processing strategy queued-asynchronous, which means that the events triggered by the Quartz endpoint are sent to a SEDA queue, then processed by flow threads as available. The Quartz endpoint is set to repeat every 1 ms, which is very low. There is very little chance of the component to process in that time. When the thread pool used by the flow is exhausted, then the SEDA queue starts to get filled. When the elements in the queue exceed the default timeout to get a thread to get execute, you receive the error. This issue is described in the KB https://help.mulesoft.com/s/article/Error-The-queue-for-SEDA-queue-name-did-not-accept-new-event-within-30000-MILLISECONDS

    You could change the flow processing strategy to synchronous to reuse the queue from the connector to execute and avoid the queueing, but the repeatInterval seems to be unrealistically small.

    On a side note, the Quartz connector has been deprecated since a long time ago. It has been replaced by the Poll scope.