Search code examples
amqpactivemq-artemis

ActiveMQ Artemis redelivery delay using AMQP annotations


In artemis when using AMQP, there is an annotation to indicate that you wish for the message to be delivered at a later time called x-opt-delivery-time https://activemq.apache.org/components/artemis/documentation/latest/amqp.html#amqp-scheduling-message-delivery.

This works fine when creating a message, but it does not appear to work when you set it using the modified disposition frame.

I created this quick go program to demonstrate the issue

package main

import (
    "context"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)

func main() {
    // create connection
    opts := &amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain("admin", "admin"),
    }
    conn, err := amqp.Dial(context.TODO(), "amqp://0.0.0.0:13001", opts)
    if err != nil {
        panic(err)
    }
    // create session
    session, err := conn.NewSession(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    // create a new sender
    sender, err := session.NewSender(context.TODO(), "test.queue", nil)
    if err != nil {
        panic(err)
    }
    // create a new receiver
    receiver, err := session.NewReceiver(context.TODO(), "test.queue", nil)
    if err != nil {
        panic(err)
    }

    // send test message
    msg := amqp.NewMessage([]byte{1})
    msg.Annotations = make(amqp.Annotations)
    msg.Annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
    log.Printf("sending msg")
    err = sender.Send(context.TODO(), msg, nil)
    if err != nil {
        panic(err)
    }

    // receive the test message
    msg, err = receiver.Receive(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    log.Printf("received initial msg")
    annotations := msg.Annotations
    if annotations == nil {
        annotations = make(amqp.Annotations)
    }
    annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
    err = receiver.ModifyMessage(context.TODO(), msg, &amqp.ModifyMessageOptions{
        DeliveryFailed:    true,
        UndeliverableHere: false,
        Annotations:       annotations,
    })
    if err != nil {
        panic(err)
    }

    // receive the modified test message
    msg, err = receiver.Receive(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    log.Printf("received modified msg")
    receiver.AcceptMessage(context.TODO(), msg)
}

It correctly delays the delivery of the initial message by 10 seconds, but it does not delay the redelivery of the message when using the modify message disposition.

Here is the output when i run the script

2024/07/28 21:02:34 sending msg
2024/07/28 21:02:44 received initial msg
2024/07/28 21:02:44 received modified msg

Any idea if this is a bug in Artemis, or if there is another way to achieve this functionality when you wish the have a message redelivered at a specific time?


Solution

  • Apache ActiveMQ Artemis ignores the message annotations in a modified delivery state, see https://github.com/apache/activemq-artemis/blob/2.36.0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java#L415

    You could create an issue to request this new features at https://issues.apache.org/jira/browse/ARTEMIS