Search code examples
javakotlinjmsactivemq-artemis

ActiveMQ Artemis server produces lots of AMQ224016 error in logs after migration to Jakarta API client


I encountered a problem using Artemis ActiveMQ server - AMQ224016 error. Starting with version 2.20.0, but then with upgrade to 2.32.0 along with clients (diferrent combinations, clients 2.20 or 2.32 connecting to server 2.32). The problem occured after migration from Spring Boot 2.7 to 3.x (which changed the jakarta.jms-api from 2.0.3 to 3.1.0 along with jakarta imports).

Here is example stacktrace from artemis.log file:

2024-03-18 12:08:46,069 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception
org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ229027: Could not find reference on consumer ID=0, messageId = 25612055904 queue = example.queue
    at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.individualAcknowledge(ServerConsumerImpl.java:1013) ~[artemis-server-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.individualAcknowledge(ServerSessionImpl.java:1314) ~[artemis-server-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.slowPacketHandler(ServerSessionPacketHandler.java:618) ~[artemis-server-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:319) ~[artemis-server-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:32) ~[artemis-commons-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57) ~[artemis-commons-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32) ~[artemis-commons-2.32.0.jar:2.32.0]
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.32.0.jar:2.32.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.32.0.jar:2.32.0]

This stacktrace shows up for every handled message which generates over 500MB of logs every day in my case, and before that there was only like ~100KB of server logs each day.

There wasn't a problem when using client in older service with Java 8 and artemis-jms-client:2.19.1 (becase 2.20.0+ clients are meant for Java 11+) and in other service that used Java 17, Spring Boot 2.7 and artemis-jms-client:2.20.0. For tests I created example application running on Java 17, newest 2.32.0 client and the only change is Spring Boot version from 2.7.18 to 3.0.x/3.1.x/3.2.x/ along with jakarta imports (no changes in listener implementation) - on older Spring Boot there is no problem on server side.

Furthermore I tried different combinations of Artemis instances - newly created instance with default configuration and other clusterd instance (2 primary, 2 secondary), default and configured queue, with every combination the effect is the same - on older jms-api it is ok and on newer it's not.

I see that Artemis supports JMS 3.1.0 api (JakartaEE 10) from version 2.23.0.

Are there any options on what I can do and what can be missing/incompatible?

MRE

Requirements to reproduce the problem:

  • instance of artemis server 2.32.0 with default configuration (client connecting to tcp host on 61616)
  • client application with listener using Spring Boot 3.x, Jakarta JMS 3.x api

Full example repository (with sender): https://github.com/g4iner/artemis-tester/ and gist: https://gist.github.com/g4iner/ad2ad8d09a56aa538809833739866d54

My listener implementation (handles ExampleMessage from "example.queue"):

ExampleMqListener.kt

@Component
class ExampleMqListener : MqListener<ExampleMessage> {
    override fun handleMessage(message: ExampleMessage): Boolean {
        return runBlocking {
            LoggerFactory.getLogger(ExampleMqListener::class.java).info("Incoming ExampleMessage: $message")
            true
        }
    }
}

data class ExampleMessage(val data: String) : MqMessage

class MqListenerImpl<T : MqMessage>(private val messageConverter: MessageConverter, private val handler: MqListener<T>) : MessageListener {
    override fun onMessage(jmsMessage: Message) {
        val message = messageConverter.fromMessage(jmsMessage) as? T ?: error("Unexpected message type")
        runCatching {
            handler.handleMessage(message)
        }.getOrElse {
            LoggerFactory.getLogger(MqListenerImpl::class.java).error("Unable to handle message", it)
        }
        jmsMessage.acknowledge()
    }
}

interface MqListener<T> {
    fun handleMessage(message: T): Boolean
}

MqListenerConfig.kt


@Configuration
class MqListenerConfig(
    override val listenerContainerFactory: DefaultJmsListenerContainerFactory,
    override val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry,
    override val messageConverter: MessageConverter,
    private val mqListeners: List<MqListener<out MqMessage>>,
) : JmsListenerConfig {
    @EventListener(ApplicationReadyEvent::class)
    fun startListeners() {
        mqListeners.forEach {
            listenToQueue(it)
        }
    }

    private fun <T : MqMessage> listenToQueue(handler: MqListener<T>) {
        handler.messageType().let {
            listen(
                listenerConfig = ListenerConfig(it.simpleName, it.simpleName, "example.queue", "1-2"),
                handler = handler,
                messageClass = it
            )
        }
    }

    private fun <T : MqMessage> MqListener<T>.messageType(): Class<T> = (javaClass.genericInterfaces[0] as ParameterizedType).actualTypeArguments[0] as Class<T>
}

@JsonIgnoreProperties(value = ["type"])
interface MqMessage : Serializable {
    val type: String
        get() = javaClass.simpleName
}

class ListenerConfig(
    val messageType: String,
    val id: String,
    val destination: String,
    val concurrency: String
)

interface JmsListenerConfig {
    val listenerContainerFactory: DefaultJmsListenerContainerFactory
    val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry
    val messageConverter: MessageConverter
}

fun <T : MqMessage> JmsListenerConfig.listen(
    listenerConfig: ListenerConfig,
    handler: MqListener<T>,
    messageClass: Class<T>
) = SimpleJmsListenerEndpoint().apply {
    id = listenerConfig.id
    destination = listenerConfig.destination
    concurrency = listenerConfig.concurrency
    selector = "_type = '${messageClass.canonicalName}'"
    messageListener = MqListenerImpl(messageConverter, handler)
}.let {
    jmsListenerEndpointRegistry.registerListenerContainer(it, listenerContainerFactory, true)
}

MqConnectionConfig.kt

@Configuration
@ConfigurationProperties(prefix = "mq")
@EnableJms
class MqConnectionConfig {
    var front: MqConfig = MqConfig()

    @Bean
    fun backConnectionFactory(): SingleConnectionFactory =
        SingleConnectionFactory(createFactory(front)).apply {
            setReconnectOnException(true)
        }

    private fun createFactory(config: MqConfig) =
        ActiveMQConnectionFactory(config.url).apply {
            user = config.user
            password = config.password
        }

    @Bean
    fun jmsListenerContainerFactory(): DefaultJmsListenerContainerFactory = DefaultJmsListenerContainerFactory().apply {
        setConnectionFactory(backConnectionFactory())
        setMessageConverter(jacksonJmsMessageConverter())
        setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)
    }

    @Bean
    fun jmsListenerEndpointRegistry(): JmsListenerEndpointRegistry = JmsListenerEndpointRegistry()

    @Bean
    fun jacksonJmsMessageConverter() = MappingJackson2MessageConverter().apply {
        setTargetType(MessageType.TEXT)
        setTypeIdPropertyName("_type")
        setObjectMapper(jacksonObjectMapper().apply {
            configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            registerModule(JavaTimeModule())
            disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
            disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
        })
    }
}

class MqConfig {
    var url: String = "tcp://localhost:61616"
    var user: String? = null
    var password: String? = null
}

dependencies used in build.gradle.kts

    implementation("org.springframework.boot:spring-boot-starter-artemis")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")

Solution

  • The problem is caused by usage of jmsMessage.acknowledge() in the application's MessageListener implementation. Spring automatically acknowledges the message so there is no need to manually invoke this method in the code.

    Additionally, in the above implementation the use of INDIVIDUAL_ACKNOWLEDGE is unnecessary and CLIENT_ACKNOWLEDGE can be used.