Search code examples
javaakkaactivemq-classicakka-streamalpakka

Manual Acknowledgement of ActiveMQ Messages with Alpakka


I am working on implementing Akka Alpakka for consuming from and producing to ActiveMQ queues, in Java. I can consume from the queue successfully, but I haven't yet been able to implement application-level message acknowledgement.

My goal is to consume messages from a queue and send them to another actor for processing. When that actor has completed processing, I want it to be able control the acknowledgement of the message in ActiveMQ. Presumably this would be done by sending a message to another actor that can do the acknowledgement, calling an acknowledge function on the message itself, or some other way.

In my test, 2 messages are put into the AlpakkaTest queue, and then this code attempts to consume and acknowledge them. However, I don't see a way to set the ActiveMQ session to CLIENT_ACKNOWLEDGE, and I don't see any difference in behavior with or without the call to m.acknowledge();. Because of this, I think messages are still being auto-acknowledged.

Does anyone know the accepted way to configure ActiveMQ sessions for CLIENT_ACKNOWLEDGE and manually acknowledge ActiveMQ messages in Java Akka systems using Alpakka?

The relevant test function is:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.

Source<Message, NotUsed> jmsSource = JmsSource.create(
    JmsSourceSettings.create(connectionFactory)
        .withQueue("AlpakkaTest")
        .withBufferSize(2)
);

Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.

try {
    List<Message> messages = jmsSource
        .take(2)
        .runWith(Sink.seq(), materializer)
        .toCompletableFuture().get(4, TimeUnit.SECONDS);

    for(Message m:messages) {
        System.out.println("Found Message ID: " + m.getJMSMessageID());

        try {
            m.acknowledge();
        } catch(JMSException jmsException) {
            System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
        }
    }
} catch (InterruptedException e1) {
    e1.printStackTrace();
} catch (ExecutionException e1) {
    e1.printStackTrace();
} catch (TimeoutException e1) {
    e1.printStackTrace();
} catch (JMSException e) {
    e.printStackTrace();
}

This code prints:

Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1

Solution

  • Update: The acknowledgement mode is configurable in the JMS connector since Alpakka 0.15. From the linked documentation:

    Source<Message, NotUsed> jmsSource = JmsSource.create(JmsSourceSettings
        .create(connectionFactory)
        .withQueue("test")
        .withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge())
    );
    
    CompletionStage<List<String>> result = jmsSource
        .take(msgsIn.size())
        .map(message -> {
            String text = ((ActiveMQTextMessage)message).getText();
            message.acknowledge();
            return text;
        })
        .runWith(Sink.seq(), materializer);
    

    As of version 0.11, Alpakka's JMS connector does not support application-level message acknowledgment. Alpakka creates internally a Session with the CLIENT_ACKNOWLEDGE mode here and acknowledges each message here in the internal MessageListener. The API does not expose these settings for overriding.

    There is an open ticket that discusses enabling downstream acknowledgement of queue-based sources, but that ticket has been inactive for a while.

    Currently you cannot prevent Alpakka from acknowledging the messages at the JMS level. However, that doesn't preclude you from adding a stage to your stream that sends each message to an actor for processing and uses the actor's replies as backpressure signals. The Akka Streams documentation describes how to do this with either a combination of mapAsync and ask or with Sink.actorRefWithAck. For example, to use the former:

    Timeout askTimeout = Timeout.apply(4, TimeUnit.SECONDS);
    
    jmsSource
        .mapAsync(2, msg -> ask(processorActor, msg, askTimeout))
        .runWith(Sink.seq(), materializer);
    

    (Side note: In the related Streamz project, there is a recently opened ticket to allow application-level acknowledgement. Streamz is the replacement for the old akka-camel module and, like Alpakka, is built on Akka Streams. Streamz also has a Java API and is listed in the Alpakka documentation as an external connector.)