Search code examples
springamazon-web-servicesjmsmessage-queueamazon-sqs

JmsListener SQS unbale to consume messages


I have a configuration where at a time 10 messages are coming in parallel to a SQS queue. To consume it I am using JmsListener.

Let me show you my configuration:

  public SQSConnectionFactory sqsConnectionFactory() {
    // Create a new connection factory with all defaults (credentials and region) set automatically
    return new SQSConnectionFactory(new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
            .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
  }

  @Bean("jmsListenerContainerFactory")
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(sqsConnectionFactory());
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setConcurrency("3-10");
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
  }

To use this :

  @JmsListener(destination = "queue.fifo",
      containerFactory = "jmsListenerContainerFactory")
  public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {

}

When I am using this. Some messages are not even coming in the code. JMS is not consuming the messages and those messages are transferred to the dead_queue.

Queues:
1. queue.fifo

Name:   queue.fifo  
Default Visibility Timeout: 30 seconds
Message Retention Period:   4 days
Maximum Message Size:   256 KB
Created:    2019-09-16 12:50:43 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:35:29 GMT+05:30   
Messages Available (Visible):   0
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    
Messages Delayed:   0
Content-Based Deduplication:    Enabled     

2. queue_dead.fifo
Default Visibility Timeout: 30 seconds  
Message Retention Period:   4 days  
Maximum Message Size:   256 KB
Created:    2019-09-16 12:51:08 GMT+05:30   
Receive Message Wait Time:  0 seconds
Last Updated:   2020-06-12 16:47:17 GMT+05:30   
Messages Available (Visible):   5
Delivery Delay: 0 seconds   
Messages in Flight (Not Visible):   0
Queue Type: FIFO    Messages Delayed:   0
Content-Based Deduplication:    Disabled

Is there anything I am missing

  1. when I looked at the was console it says, message received at this time but in my logs they are not received.

Is there a way to enable SQS logs ?


Solution

  • I must say I made a silly mistake but that can happen to anyone.

    Reason for the mistake

    I have a Dev and QA account of AWS. To save money we merge both the accounts but we let SQS in separate accounts.

    Way to access SQS queue

    I am trying to create the SQS connection factory like this:

     public SQSConnectionFactory sqsConnectionFactory() {
        // Create a new connection factory with all defaults (credentials and region) set automatically
        return new SQSConnectionFactory(new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
                .withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
      }
    

    In this way, JMS tries to resolve the SQS queue URL based on the AWS access and secret key. Now that we have merged the dev and QA accounts, even QA instance of my application was creating a dev SQS URL.

    Solution

    I solved it by dynamically resolving the SQS connection with the AWS account ID instead of AWS access and secret keys.

    Here is the code to resolve:

    Pass the ownerAccountId

      public static class CustomDynamicDestinationResolver implements DestinationResolver {
    
            private String ownerAccountId;
    
            public CustomDynamicDestinationResolver(String ownerAccountId) {
                this.ownerAccountId = ownerAccountId;
            }
    
            @Override
            public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
                Assert.notNull(session, "Session must not be null");
                Assert.notNull(destinationName, "Destination name must not be null");
                if (pubSubDomain) {
                    return resolveTopic(session, destinationName);
                } else {
                    return resolveQueue(session, destinationName);
                }
            }
    
            protected Topic resolveTopic(Session session, String topicName) throws JMSException {
                return session.createTopic(topicName);
            }
    
            protected Queue resolveQueue(Session session, String queueName) throws JMSException {
                Queue queue;
                //LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
                if (ownerAccountId != null && session instanceof SQSSession) {
                    queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
                } else {
                    queue = session.createQueue(queueName);
                }
                return queue;
            }
      }