Search code examples
javaspring-cloudamazon-sqsspring-cloud-aws

Issue testing spring cloud SQS Listener


Environment

  • Spring Boot: 1.5.13.RELEASE
  • Cloud: Edgware.SR3
  • Cloud AWS: 1.2.2.RELEASE
  • Java 8
  • OSX 10.13.4

Problem

I am trying to write an integration test for SQS.

I have a local running localstack docker container with SQS running on TCP/4576

In my test code I define an SQS client with the endpoint set to local 4576 and can successfully connect and create a queue, send a message and delete a queue. I can also use the SQS client to receive messages and pick up the message that I sent.

My problem is that if I remove the code that is manually receiving the message in order to allow another component to get the message nothing seems to be happening. I have a spring component annotated as follows:

Listener

@Component
public class MyListener {
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
    public void receive(final MyMsg msg) {
        System.out.println("GOT THE MESSAGE: "+ msg.toString());
    }
}

Test

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest {

    @Autowired
    private AmazonSQSAsync amazonSQS;

    @Autowired
    private SimpleMessageListenerContainer container;

    private String queueUrl;

    @Before
    public void setUp() {
        queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
    }

    @After
    public void tearDown() {
        amazonSQS.deleteQueue(queueUrl);
    }

    @Test
    public void name() throws InterruptedException {
        amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
        System.out.println("isRunning:" + container.isRunning());
        System.out.println("isActive:" + container.isActive());
        System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
        Thread.sleep(30_000);
        System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
    }

    @TestConfiguration
    @EnableSqs
    public static class SQSConfiguration {

        @Primary
        @Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() {
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
            return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
        }
    }
}

In the test logs I see:

o.s.c.a.m.listener.QueueMessageHandler : 1 message handler methods found on class MyListener: {public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a} 2018-05-31 22:50:39.582 INFO 16329 ---

o.s.c.a.m.listener.QueueMessageHandler : Mapped "org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a" onto public void MyListener.receive(MyMsg)

Followed by:

isRunning:true

isActive:true

isRunningOnQueue:false

GOT MESSAGE: 1

This demonstrates that in the 30 second pause between sending the message the container didn't pick it up and when I manually poll for the message it is there on the queue and I can consume it.

My question is, why isn't the listener being invoked and why is the isRunningOnQueue:false line suggesting that it's not auto started for that queue?

Note that I also tried setting my own SimpleMessageListenerContainer bean with autostart set to true explicitly (the default anyway) and observed no change in behaviour. I thought that the org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer that is set up by @EnableSqs ought to configure an auto started SimpleMessageListenerContainer that should be polling for me message.

I have also set

logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG

in my test properties and can see the HTTP calls create the queue, send a message and delete etc but no HTTP calls to receive (apart from my manual one at the end of the test).


Solution

  • I figured this out after some tinkering.

    Even if the simple message container factory is set to not auto start, it seems to do its initialisation anyway, which involves determining whether the queue exists.

    In this case, the queue is created in my test in the setup method - but sadly this is after the spring context is set up which means that an exception occurs.

    I fixed this by simply moving the queue creation to the context creation of the SQS client (which happens before the message container is created). i.e.:

    @Bean(destroyMethod = "shutdown")
            public AmazonSQSAsync amazonSQS() {
                final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
                final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                        .standard()
                        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
                        .withEndpointConfiguration(endpoint)
                        .build());
                client.createQueue("test-queue");
                return client;
            }