Search code examples
javaamazon-web-servicesapache-camelamazon-sqs

java - Unable to delete the message from the SQS after it gets processed through the camel


I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...

Here is the SQS part I'm sending

@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
    try {
        Long companyID = transactionToSend.getCompanyID();
        validateRequest(companyID, transactionToSend.getCostCenterID());
        VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
        EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);

        String messageBody = getSQSMessageBody(vtr);
        SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);

        setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);

        AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
        SendMessageResult result = client.sendMessage(sendMessageRequest);
        if (result == null) {
            throw new EDIException("An error occurred while sending the message.");
        }

        log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
    } catch (Exception e) {
        e.printStackTrace();
        log.error(String.format("Error sending transaction: %s", e.getMessage()));
        throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
    }
}

private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
    addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
    addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
    addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
    addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
    addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}

Here is the route itself

@Override
public void configure() {

    // on exception deal with EDIOrder stuff
    onException(Exception.class).handled(true).process(failedOrdersProcessor).end();

    String destination = from != null ? from : (sqsQueue);
    destination += "&attributeNames=All&messageAttributeNames=All";
    log.info(destination);
    from(destination)
            .to("direct:ediOrder");
    // order processing
    from("direct:ediOrder")
            .choice()
            .when(body().isInstanceOf(String.class))
            .process(exchange -> {
                //filter out duplicates
                String body = "undefined";
                try {
                    body = exchange.getIn().getBody().toString();
                    VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
                    if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
                        IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
                    } else {
                        exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
                        if (transaction.getDeduplicatableIdentifier() != null) {
                            exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
                        }
                    }
                } catch (IOException e) {
                    log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
                }
            })
            .end()
            .filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
            .bean(DuplicateOrderProcessor.class)
            .stop()
            .end()
            .bean(SQSOrderQueueBean.class)
            .choice()
            .when(body().isInstanceOf(EDIOrder.class))
            .recipientList(simple("${body.endpoint}"))
            .log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
            .bean(OrderCleaner.class)
            .endChoice()
            .when(body().isInstanceOf(VendorTransactionContainer.class))
            .log("Send to direct:confirmation")
            .to(RouteConstants.DIRECT_CONFIRMATION)
            .endChoice()
            .end();

The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.

Here is the stack trace I get:

WARN  o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)

If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue...


Solution

  • I've fixed this issue by passing the headers to exchange.getOut() message as when we set the body like this

    exchange.getOut().setBody(body);
    

    we are creating a new instance of a Message, so the headers from exchange.getIn() and everything else from getIn() won't be present in exchange.getOut() message.