I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...
Here is the SQS part I'm sending
@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
try {
Long companyID = transactionToSend.getCompanyID();
validateRequest(companyID, transactionToSend.getCostCenterID());
VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);
String messageBody = getSQSMessageBody(vtr);
SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);
setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);
AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
SendMessageResult result = client.sendMessage(sendMessageRequest);
if (result == null) {
throw new EDIException("An error occurred while sending the message.");
}
log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
} catch (Exception e) {
e.printStackTrace();
log.error(String.format("Error sending transaction: %s", e.getMessage()));
throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
}
}
private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}
Here is the route itself
@Override
public void configure() {
// on exception deal with EDIOrder stuff
onException(Exception.class).handled(true).process(failedOrdersProcessor).end();
String destination = from != null ? from : (sqsQueue);
destination += "&attributeNames=All&messageAttributeNames=All";
log.info(destination);
from(destination)
.to("direct:ediOrder");
// order processing
from("direct:ediOrder")
.choice()
.when(body().isInstanceOf(String.class))
.process(exchange -> {
//filter out duplicates
String body = "undefined";
try {
body = exchange.getIn().getBody().toString();
VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
} else {
exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
if (transaction.getDeduplicatableIdentifier() != null) {
exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
}
}
} catch (IOException e) {
log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
}
})
.end()
.filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
.bean(DuplicateOrderProcessor.class)
.stop()
.end()
.bean(SQSOrderQueueBean.class)
.choice()
.when(body().isInstanceOf(EDIOrder.class))
.recipientList(simple("${body.endpoint}"))
.log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
.bean(OrderCleaner.class)
.endChoice()
.when(body().isInstanceOf(VendorTransactionContainer.class))
.log("Send to direct:confirmation")
.to(RouteConstants.DIRECT_CONFIRMATION)
.endChoice()
.end();
The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.
Here is the stack trace I get:
WARN o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)
If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue...
I've fixed this issue by passing the headers to exchange.getOut() message as when we set the body like this
exchange.getOut().setBody(body);
we are creating a new instance of a Message, so the headers from exchange.getIn() and everything else from getIn() won't be present in exchange.getOut() message.