Search code examples
activemq-classicstompamazon-mq

ActiveMQ/STOMP Clear Schedule Messages Pointed To Destination


I would like to remove messages that are scheduled to be delivered to a specific queue but i'm finding the process to be unnecessarily burdensome.

Here I am sending a blank message to a queue with a delay:

self._connection.send(body="test", destination=f"/queue/my-queue", headers={
    "AMQ_SCHEDULED_DELAY": 100_000_000,
    "foo": "bar"
})

And here I would like to clear the scheduled messages for that queue:

self._connection.send(destination=f"ActiveMQ.Scheduler.Management", headers={
    "AMQ_SCHEDULER_ACTION": "REMOVEALL",
}, body="")

Of course the "destination" here needs to be ActiveMQ.Scheduler.Management instead of my actual queue. But I can't find anyway to delete scheduled messages that are destined for queue/my-queue. I tried using the selector header, but that doesn't seem to work for AMQ_SCHEDULER_ACTION type messages.

The only suggestions I've seen is to write a consumer to browser all of the scheduled messages, inspect each one for its destination, and delete each schedule by its ID. This seems insane to me as I don't have just a handful of messages but many millions of messages that I'd like to delete.

Is there a way I could send a command to ActiveMQ to clear scheduled messages with a custom header value?

Maybe I can define a custom scheduled messages location for each queue?

Edit:

I've written a wrapper around the stomp.py connection to handle purging schedules destined for a queue. The MQStompFacade takes an existing stomp.Connection and the name of the queue you are working with and provides enqueue, enqueue_many, receive, purge, and move.

When receiving from a queue, if include_delayed is True, it will subscribe to both the queue and a topic that consumes the schedules. Assuming the messages were enqueued with this class and have the name of the original destination queue as a custom header, scheduled messages that aren't destined for the receiving queue will be filtered out.

Not yet testing in production. Probably a lot of of optimizations here.

Usage:

stomp = MQStompFacade(connection, "my-queue")

stomp.enqueue_many([
  EnqueueRequest(message="hello"),
  EnqueueRequest(message="goodbye", delay=100_000)
])

stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):

    def __init__(self, connection: Connection, queue: str):
        self._connection = connection
        self._queue = queue
        self._messages: List[Message] = []
        self._connection_id = rand_string(6)
        self._connection.set_listener(self._connection_id, self)

    def __del__(self):
        self._connection.remove_listener(self._connection_id)

    def enqueue_many(self, requests: List[EnqueueRequest]):
        txid = self._connection.begin()
        for request in requests:
            headers = request.headers or {}

            # Used in scheduled message selectors
            headers["queue"] = self._queue

            if request.delay_millis:
                headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
            if request.priority is not None:
                headers['priority'] = request.priority

            self._connection.send(body=request.message,
                                  destination=f"/queue/{self._queue}",
                                  txid=txid,
                                  headers=headers)
        self._connection.commit(txid)

    def enqueue(self, request: EnqueueRequest):
        self.enqueue_many([request])

    def purge(self, selector: Optional[str] = None):
        num_purged = 0
        for _ in self.receive(idle_timeout=5, selector=selector):
            num_purged += 1
        return num_purged

    def move(self, destination_queue: AbstractQueueFacade,
             selector: Optional[str] = None):

        buffer_size = 500
        move_buffer = []

        for message in self.receive(idle_timeout=5, selector=selector):
            move_buffer.append(EnqueueRequest(
                message=message.body
            ))

            if len(move_buffer) >= buffer_size:
                destination_queue.enqueue_many(move_buffer)
                move_buffer = []

        if move_buffer:
            destination_queue.enqueue_many(move_buffer)

    def receive(self,
                max: Optional[int] = None,
                timeout: Optional[int] = None,
                idle_timeout: Optional[int] = None,
                selector: Optional[str] = None,
                peek: Optional[bool] = False,
                include_delayed: Optional[bool] = False):
        """
        Receiving messages until one of following conditions are met

        Args:
            max: Receive messages until the [max] number of messages are received
            timeout: Receive message until this timeout is reached
            idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
            selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
            peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
            include_delayed: Set to TRUE to return messages scheduled for delivery in the future
        """
        self._connection.subscribe(f"/queue/{self._queue}",
                                   id=self._connection_id,
                                   ack="client",
                                   selector=selector
                                   )
        if include_delayed:
            browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
            schedule_selector = f"queue = '{self._queue}'"
            if selector:
                schedule_selector = f"{schedule_selector} AND ({selector})"

            self._connection.subscribe(browse_topic,
                                       id=self._connection_id,
                                       ack="auto",
                                       selector=schedule_selector
                                       )

            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",
                headers={
                    "AMQ_SCHEDULER_ACTION": "BROWSE",
                    "JMSReplyTo": browse_topic
                },
                id=self._connection_id,
                body=""
            )

        listen_start = time.time()
        last_receive = time.time()
        messages_received = 0
        scanning = True
        empty_receive = False
        while scanning:
            try:
                message = self._messages.pop()
                last_receive = time.time()
                if not peek:
                    self._ack(message)
                messages_received += 1
                yield message
            except IndexError:
                empty_receive = True
                time.sleep(0.1)

            if max and messages_received >= max:
                scanning = False
            elif timeout and time.time() > listen_start + timeout:
                scanning = False
            elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
                scanning = False
            else:
                scanning = True

        self._connection.unsubscribe(id=self._connection_id)

    def on_message(self, frame):
        destination = frame.headers.get("original-destination", frame.headers.get("destination"))
        schedule_id = frame.headers.get("scheduledJobId")

        message = Message(
            attributes=MessageAttributes(
                id=frame.headers["message-id"],
                schedule_id=schedule_id,
                timestamp=frame.headers["timestamp"],
                queue=destination.replace("/queue/", "")
            ),
            body=frame.body
        )
        self._messages.append(message)

    def _ack(self, message: Message):
        """
        Deletes the message from queue.
        If the message has an scheduled_id, will also remove the associated scheduled job
        """
        if message.attributes.schedule_id:
            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",
                headers={
                    "AMQ_SCHEDULER_ACTION": "REMOVE",
                    "scheduledJobId": message.attributes.schedule_id
                },
                id=self._connection_id,
                body=""
            )
        self._connection.ack(message.attributes.id, subscription=self._connection_id)

Solution

  • In order to remove specific messages you need to know the ID which you can get via a browse of the scheduled messages. The only other option available is to use the start and stop time options in the remove operations to remove all messages inside a range.

    MessageProducer producer = session.createProducer(management);
    Message request = session.createMessage();
    request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
    request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
    request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
    producer.send(request);
    

    If that doesn't suit your need I'm sure the project would welcome contributions.