We're using Symfony Messenger, and have these transports:
framework:
messenger:
failure_transport: failed
transports:
failed:
dsn: 'doctrine://default?queue_name=failed'
options:
table_name: 'MessengerMessages'
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
delay: 5000
multiplier: 2
max_delay: 0
asyncLowPriority:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority'
retry_strategy:
max_retries: 5
delay: 3600000
multiplier: 2
max_delay: 0
sync: 'sync://'
When we send a message to the async
queue, and the last retry fails with an exception, the exception is logged to the MessengerMessages
table, and the exception bubbles up (goes to Sentry in our case). This is what we want.
When we send a message to the asyncLowPriority
queue however, we would like failed messages to:
failed
transportBasically, the exception should be dropped.
Is this possible, and how?
The reason is that we're using this queue for downloading images asynchronously, and we already log each failure in a dedicated database table in the command handler.
I managed to do this with a middleware:
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Throwable;
final class BypassFailureTransportMiddleware implements MiddlewareInterface
{
public function __construct(
private string $transportName,
private int $maxRetries,
) {
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
try {
return $stack->next()->handle($envelope, $stack);
} catch (HandlerFailedException $exception) {
$nestedException = $this->getNestedException($exception);
if ($nestedException === null) {
throw $exception;
}
/** @var ReceivedStamp|null $receivedStamp */
$receivedStamp = $envelope->last(ReceivedStamp::class);
if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) {
throw $exception;
}
if (!$this->isLastRetry($envelope, $nestedException)) {
throw $exception;
}
return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName()));
}
}
private function getNestedException(HandlerFailedException $exception): ?Throwable
{
$nestedExceptions = $exception->getNestedExceptions();
if (count($nestedExceptions) === 1) {
return $nestedExceptions[0];
}
return null;
}
private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool
{
if ($nestedException instanceof UnrecoverableMessageHandlingException) {
return true;
}
/** @var RedeliveryStamp|null $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
if ($redeliveryStamp === null) {
return false;
}
return $redeliveryStamp->getRetryCount() === $this->maxRetries;
}
}
It must be configured with the name of the transport and the configured max_retries
of this transport:
parameters:
async_allow_failure_transport_name: 'asyncAllowFailure'
async_allow_failure_max_retries: 5
services:
command.bus.bypass_failure_transport_middleware:
class: App\Infrastructure\CommandBus\Middleware\BypassFailureTransportMiddleware
arguments:
$transportName: '%async_allow_failure_transport_name%'
$maxRetries: '%async_allow_failure_max_retries%'
framework:
messenger:
transports:
- name: '%async_allow_failure_transport_name%'
dsn: '...'
retry_strategy:
max_retries: '%async_allow_failure_max_retries%'
delay: 1000
multiplier: 2
max_delay: 0
buses:
command.bus:
middleware:
- 'command.bus.bypass_failure_transport_middleware'