Search code examples
spring-bootspring-amqpactivemq-artemisamq

How to publish to a Red Hat AMQ 7 topic using AMQP 1.0 from a test case using amqp-10-jms-spring-boot-starter


I created a subscriber to a multicast queue. When starting the application I can see that the multicast address gets created, the client queue underneath gets created and if I use the web interface to publish a message to the address my topic subscriber listens to.

enter image description here

But I cannot publish to the queue using a test case. The test results in this error when the jmsTemplate.convertAndSend is called : Received error from remote peer without description [condition = amqp:invalid-field]

Not sure if this error is because it is trying to create another connection with the same client id or not. How do I create a jmsTemplate for publishing? (I know this is testing it from a test case but my full application should listen on messages, enhance it and send it again to another queue. This is just to simulate my error I am getting)

My application code are below or on GitHub:

Springboot app

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@SpringBootApplication
@EnableJms
public class MyApplication {

    public static void main(final String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

}

MyContainerFactory.java

import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;

@Component
public class MyContainerFactory extends DefaultJmsListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private DefaultJmsListenerContainerFactoryConfigurer configurer;

    @Setter
    private String containerClientId;

    @PostConstruct
    public void init() {
        configurer.configure(this, connectionFactory);
        setPubSubDomain(true);
        setClientId(containerClientId);
        setSubscriptionDurable(true);
    }
}

MyListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyListener {

    @Value("${addresses.multicast_topic_address}")
    private String myTopicAddress;

    @JmsListener(destination = "${addresses.multicast_topic_address}",
            containerFactory = "myContainerFactory",
            subscription = "${addresses.multicast_topic_address}")
    public void processMsg(final String message) {
        log.info("============= Received: " + message);
    }
}

application.yml

server:
  port: 9015
spring:
  jms:
    pub-sub-domain: true
    template:
      delivery-mode: persistent
      qos-enabled: true
    listener:
      acknowledge-mode: client
  main:
    web-application-type: none

amqphub:
  amqp10jms:
    remote-url: amqp://localhost:61616
    username: admin
    password: admin
    clientId: my_topic_client

addresses:
  multicast_topic_address: topic_address

With my test

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;

@Slf4j
@SpringBootTest(classes = MyApplication.class)
public class MyTests {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Value("${addresses.multicast_topic_address}")
    private String myTopicAddress;

    @Test
    void testMe() {
        jmsTemplate.convertAndSend(myTopicAddress, "my custom message....");
    }
}

Full error message:

org.springframework.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]; nested exception is javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]

    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:277)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:661)
    at nl.ns.inkomend.processor.MyTests.testMe(MyTests.java:23)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
    at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:35)
    at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:21)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:682)
    at org.apache.qpid.jms.JmsConnection.createJmsConnection(JmsConnection.java:593)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:180)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:213)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:200)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:197)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494)
    ... 72 more
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:136)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.getOpenAbortExceptionFromRemote(AmqpConnectionBuilder.java:170)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:968)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:833)

Solution

  • Found the issue. In my DefaultJmsListenerContainerFactory class I had a setClientId action that set the client id to null as the variable was not set. Changing this to an actual value sorted it for me.

    @Component
    public class MyContainerFactory extends DefaultJmsListenerContainerFactory {
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Autowired
        private DefaultJmsListenerContainerFactoryConfigurer configurer;
    
        @PostConstruct
        public void init() {
            configurer.configure(this, connectionFactory);
            setPubSubDomain(true);
            setClientId("my_topic_client");  //This was not set 
            setSubscriptionDurable(true);
        }
    }