I encountered a problem using Artemis ActiveMQ server - AMQ224016 error. Starting with version 2.20.0, but then with upgrade to 2.32.0 along with clients (diferrent combinations, clients 2.20 or 2.32 connecting to server 2.32). The problem occured after migration from Spring Boot 2.7 to 3.x (which changed the jakarta.jms-api from 2.0.3 to 3.1.0 along with jakarta imports).
Here is example stacktrace from artemis.log file:
2024-03-18 12:08:46,069 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception
org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ229027: Could not find reference on consumer ID=0, messageId = 25612055904 queue = example.queue
at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.individualAcknowledge(ServerConsumerImpl.java:1013) ~[artemis-server-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.individualAcknowledge(ServerSessionImpl.java:1314) ~[artemis-server-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.slowPacketHandler(ServerSessionPacketHandler.java:618) ~[artemis-server-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:319) ~[artemis-server-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:32) ~[artemis-commons-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:57) ~[artemis-commons-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32) ~[artemis-commons-2.32.0.jar:2.32.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68) ~[artemis-commons-2.32.0.jar:2.32.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.32.0.jar:2.32.0]
This stacktrace shows up for every handled message which generates over 500MB of logs every day in my case, and before that there was only like ~100KB of server logs each day.
There wasn't a problem when using client in older service with Java 8 and artemis-jms-client:2.19.1 (becase 2.20.0+ clients are meant for Java 11+) and in other service that used Java 17, Spring Boot 2.7 and artemis-jms-client:2.20.0. For tests I created example application running on Java 17, newest 2.32.0 client and the only change is Spring Boot version from 2.7.18 to 3.0.x/3.1.x/3.2.x/ along with jakarta imports (no changes in listener implementation) - on older Spring Boot there is no problem on server side.
Furthermore I tried different combinations of Artemis instances - newly created instance with default configuration and other clusterd instance (2 primary, 2 secondary), default and configured queue, with every combination the effect is the same - on older jms-api it is ok and on newer it's not.
I see that Artemis supports JMS 3.1.0 api (JakartaEE 10) from version 2.23.0.
Are there any options on what I can do and what can be missing/incompatible?
MRE
Requirements to reproduce the problem:
Full example repository (with sender): https://github.com/g4iner/artemis-tester/ and gist: https://gist.github.com/g4iner/ad2ad8d09a56aa538809833739866d54
My listener implementation (handles ExampleMessage from "example.queue"):
ExampleMqListener.kt
@Component
class ExampleMqListener : MqListener<ExampleMessage> {
override fun handleMessage(message: ExampleMessage): Boolean {
return runBlocking {
LoggerFactory.getLogger(ExampleMqListener::class.java).info("Incoming ExampleMessage: $message")
true
}
}
}
data class ExampleMessage(val data: String) : MqMessage
class MqListenerImpl<T : MqMessage>(private val messageConverter: MessageConverter, private val handler: MqListener<T>) : MessageListener {
override fun onMessage(jmsMessage: Message) {
val message = messageConverter.fromMessage(jmsMessage) as? T ?: error("Unexpected message type")
runCatching {
handler.handleMessage(message)
}.getOrElse {
LoggerFactory.getLogger(MqListenerImpl::class.java).error("Unable to handle message", it)
}
jmsMessage.acknowledge()
}
}
interface MqListener<T> {
fun handleMessage(message: T): Boolean
}
MqListenerConfig.kt
@Configuration
class MqListenerConfig(
override val listenerContainerFactory: DefaultJmsListenerContainerFactory,
override val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry,
override val messageConverter: MessageConverter,
private val mqListeners: List<MqListener<out MqMessage>>,
) : JmsListenerConfig {
@EventListener(ApplicationReadyEvent::class)
fun startListeners() {
mqListeners.forEach {
listenToQueue(it)
}
}
private fun <T : MqMessage> listenToQueue(handler: MqListener<T>) {
handler.messageType().let {
listen(
listenerConfig = ListenerConfig(it.simpleName, it.simpleName, "example.queue", "1-2"),
handler = handler,
messageClass = it
)
}
}
private fun <T : MqMessage> MqListener<T>.messageType(): Class<T> = (javaClass.genericInterfaces[0] as ParameterizedType).actualTypeArguments[0] as Class<T>
}
@JsonIgnoreProperties(value = ["type"])
interface MqMessage : Serializable {
val type: String
get() = javaClass.simpleName
}
class ListenerConfig(
val messageType: String,
val id: String,
val destination: String,
val concurrency: String
)
interface JmsListenerConfig {
val listenerContainerFactory: DefaultJmsListenerContainerFactory
val jmsListenerEndpointRegistry: JmsListenerEndpointRegistry
val messageConverter: MessageConverter
}
fun <T : MqMessage> JmsListenerConfig.listen(
listenerConfig: ListenerConfig,
handler: MqListener<T>,
messageClass: Class<T>
) = SimpleJmsListenerEndpoint().apply {
id = listenerConfig.id
destination = listenerConfig.destination
concurrency = listenerConfig.concurrency
selector = "_type = '${messageClass.canonicalName}'"
messageListener = MqListenerImpl(messageConverter, handler)
}.let {
jmsListenerEndpointRegistry.registerListenerContainer(it, listenerContainerFactory, true)
}
MqConnectionConfig.kt
@Configuration
@ConfigurationProperties(prefix = "mq")
@EnableJms
class MqConnectionConfig {
var front: MqConfig = MqConfig()
@Bean
fun backConnectionFactory(): SingleConnectionFactory =
SingleConnectionFactory(createFactory(front)).apply {
setReconnectOnException(true)
}
private fun createFactory(config: MqConfig) =
ActiveMQConnectionFactory(config.url).apply {
user = config.user
password = config.password
}
@Bean
fun jmsListenerContainerFactory(): DefaultJmsListenerContainerFactory = DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(backConnectionFactory())
setMessageConverter(jacksonJmsMessageConverter())
setSessionAcknowledgeMode(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE)
}
@Bean
fun jmsListenerEndpointRegistry(): JmsListenerEndpointRegistry = JmsListenerEndpointRegistry()
@Bean
fun jacksonJmsMessageConverter() = MappingJackson2MessageConverter().apply {
setTargetType(MessageType.TEXT)
setTypeIdPropertyName("_type")
setObjectMapper(jacksonObjectMapper().apply {
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
registerModule(JavaTimeModule())
disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
})
}
}
class MqConfig {
var url: String = "tcp://localhost:61616"
var user: String? = null
var password: String? = null
}
dependencies used in build.gradle.kts
implementation("org.springframework.boot:spring-boot-starter-artemis")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
The problem is caused by usage of jmsMessage.acknowledge()
in the application's MessageListener
implementation. Spring automatically acknowledges the message so there is no need to manually invoke this method in the code.
Additionally, in the above implementation the use of INDIVIDUAL_ACKNOWLEDGE
is unnecessary and CLIENT_ACKNOWLEDGE
can be used.