I have a configuration where at a time 10 messages are coming in parallel to a SQS queue. To consume it I am using JmsListener.
Let me show you my configuration:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
@Bean("jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
To use this :
@JmsListener(destination = "queue.fifo",
containerFactory = "jmsListenerContainerFactory")
public void receiveCustomerStakeholderKyc(@Payload final Message<?> message) throws Exception {
}
When I am using this. Some messages are not even coming in the code. JMS is not consuming the messages and those messages are transferred to the dead_queue.
Queues:
1. queue.fifo
Name: queue.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:50:43 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:35:29 GMT+05:30
Messages Available (Visible): 0
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO
Messages Delayed: 0
Content-Based Deduplication: Enabled
2. queue_dead.fifo
Default Visibility Timeout: 30 seconds
Message Retention Period: 4 days
Maximum Message Size: 256 KB
Created: 2019-09-16 12:51:08 GMT+05:30
Receive Message Wait Time: 0 seconds
Last Updated: 2020-06-12 16:47:17 GMT+05:30
Messages Available (Visible): 5
Delivery Delay: 0 seconds
Messages in Flight (Not Visible): 0
Queue Type: FIFO Messages Delayed: 0
Content-Based Deduplication: Disabled
Is there anything I am missing
Is there a way to enable SQS logs ?
I must say I made a silly mistake but that can happen to anyone.
Reason for the mistake
I have a Dev and QA account of AWS. To save money we merge both the accounts but we let SQS in separate accounts.
Way to access SQS queue
I am trying to create the SQS connection factory like this:
public SQSConnectionFactory sqsConnectionFactory() {
// Create a new connection factory with all defaults (credentials and region) set automatically
return new SQSConnectionFactory(new ProviderConfiguration(),
AmazonSQSClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
.withCredentials(DefaultAWSCredentialsProviderChain.getInstance()).build());
}
In this way, JMS tries to resolve the SQS queue URL based on the AWS access and secret key. Now that we have merged the dev and QA accounts, even QA instance of my application was creating a dev SQS URL.
Solution
I solved it by dynamically resolving the SQS connection with the AWS account ID instead of AWS access and secret keys.
Here is the code to resolve:
Pass the ownerAccountId
public static class CustomDynamicDestinationResolver implements DestinationResolver {
private String ownerAccountId;
public CustomDynamicDestinationResolver(String ownerAccountId) {
this.ownerAccountId = ownerAccountId;
}
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
Assert.notNull(session, "Session must not be null");
Assert.notNull(destinationName, "Destination name must not be null");
if (pubSubDomain) {
return resolveTopic(session, destinationName);
} else {
return resolveQueue(session, destinationName);
}
}
protected Topic resolveTopic(Session session, String topicName) throws JMSException {
return session.createTopic(topicName);
}
protected Queue resolveQueue(Session session, String queueName) throws JMSException {
Queue queue;
//LOGGER.info("Getting destination for libraryOwnerAccountId: {}, queueName: {}", ownerAccountId, queueName);
if (ownerAccountId != null && session instanceof SQSSession) {
queue = ((SQSSession) session).createQueue(queueName, ownerAccountId);
} else {
queue = session.createQueue(queueName);
}
return queue;
}
}