Search code examples
phpsymfonysymfony4symfony-messenger

Symfony MessageHandler count how many times a message has been dispatched


I'm using Symfony Messenger and I want to keep dispatching a message in the handler until it has been dispatched a number of times.

How can I keep track of that?

This is the code of my handler class so far:

class RetryTestHandler implements MessageHandlerInterface
{
    /**
    * @var EntityManagerInterface
    */
    private $entityManager;
    /**
     * @var MessageBusInterface
     */
    private $bus;

    public function __construct(MessageBusInterface $bus, EntityManagerInterface $entityManager)
    {
        $this->entityManager = $entityManager;
        $this->bus = $bus;
    }

    public function __invoke(RetryTest $message)
    {
        // TODO: Keep dispatching message until it has been dispatched 10 times?
        $this->bus->dispatch(new RetryTest("This is a test!"), [
            new DelayStamp(5000)
        ]);
    }
}


Solution

  • To add metadata to your messages, you can use stamps.

    Which you can later use in your own custom middleware.

    E.g. for this custom StampInterface implementing class:

    class LoopCount implements StampInterface {
    
    
        private int $count; 
    
        public function __construct($count) {
            $this->count = $count;
        }
    
        public function getCount(): int {
            return $this->count;
        } 
    }
    

    Then create your own middleware that checks for this stamp and re-dispatches after handling:

    class ResendingMiddleware implements MiddlewareInterface
    {
         private $bus;
    
         public function __construct(MessageBusInterface $bus) {
               $this->bus = $bus;
        }
    
        public function handle(Envelope $envelope, StackInterface $stack): Envelope
        {
    
            $envelope = $stack->next()->handle($envelope, $stack);
    
            if (null !== $stamp = $envelope->last(LoopCount::class)) {
                $count = $stamp->getCount();
            } else {
                return $envelope;
            }
    
            // Stop dispatching
            if ($count > 9) {
                return $envelope;
            }
    
            $this->bus->dispatch(new RetryTest("Dit is een test"), [
                new DelayStamp(5000),
                new LoopCount($count + 1)
            ]);
    
            return $envelope;
        }
    

    If it was processed more than 9 times, consume the message without doing anything.

    You need also to add the middleware to the configuration:

    framework:
        messenger:
            buses:
                messenger.bus.default:
                    middleware:
                        # service ids that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
                        - 'App\Middleware\ResendingMiddleware'
    

    I wrote this in a hurry and can't test it right now, but the base should help you go in the right direction. Test and debug, and you'll get it working. I'll get back to this later to try to see if there is anything missing