Search code examples
spring-bootspring-integrationspring-integration-dsl

Preserving the transaction of an IntegrationFlow when handling errors


Consider the following IntegrationFlow:

    @Bean
    public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
        return IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                    .update(Update.update("status", "DONE"))
                    .collectionName("test").entityClass(Document.class), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
                .split()
                .handle((GenericHandler<Document>) (p, h) -> {
                    if(p.get("message").toString().contains("james"))
                        throw new IllegalArgumentException("no talking to james allowed");
                    // process message
                    return p;
                }).nullChannel();
    }

This gets a lock on the record and sets it to DONE. If the flow succeeds the transactions is committed otherwise it rolls back. In this case a message to my co-worker James will never succeed, so the message fails over and over again. My first attempt was to use an error channel to mark it FAILED after three failed attempts.

    @Bean
    public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
        return IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                    .update(Update.update("status", "DONE"))
                    .collectionName("test").entityClass(Document.class), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
                .enrichHeaders(h -> h.errorChannel("testError"))
                .split()
                .handle((GenericHandler<Document>) (p, h) -> {
                    if(p.get("message").toString().contains("james"))
                        throw new IllegalArgumentException("no talking to james allowed");
                    // process message
                    return p;
                }).nullChannel();
    }

    @Bean
    public IntegrationFlow errorFlow(MongoTemplate mongoTemplate) {
        return IntegrationFlow.from("testError")
                .transform(MessageHandlingException::getFailedMessage)
                .handle((GenericHandler<Document>)(p, h) -> 
                    p.append("fails", p.getInteger("fails", 0) + 1)
                        .append("status", p.getInteger("fails") > 2 ? "FAILED" : "READY"))
                .handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test")
                  .entityClass(Document.class)
                    .collectionCallback((c, m) -> 
                        c.findOneAndUpdate(Filters.eq("uuid", 
                                               ((Document) m.getPayload()).get("uuid")), 
                            Updates.combine(
                                Updates.set("status", ((Document) m.getPayload()).get("status")),
                                Updates.set("fails",  ((Document) m.getPayload()).get("fails" ))
                            )
                        )
                    )
                )
                .nullChannel();
    }

This works but the transaction is terminated when the message leaves the mongoFlow and is sent to the testError channel. I want to update the record to either increment the failure count with a status of READY or set the status to FAILED and then commit.

My next attempt was to use ExpressionEvaluatingRequestHandlerAdvice:

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice handleErrorAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setReturnFailureExpressionResult(true);
        advice.setOnSuccessExpression(new FunctionExpression<GenericMessage<Document>> (
            gm -> gm.getPayload().append("status", "DONE")
        ));
        advice.setOnFailureExpression(new FunctionExpression<GenericMessage<Document>> (
            gm -> gm.getPayload().append("fails", p.getInteger("fails", 0) + 1)
                        .append("status", p.getInteger("fails") > 2 ? "FAILED" : "READY")
        ));

        return advice;
    }

    @Bean
    public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
        return IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                    .update(Update.update("status", "PROCESSING"))
                    .collectionName("test").entityClass(Document.class), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
                .split()
                .handle((GenericHandler<Document>) (p, h) -> {
                    if(p.get("message").toString().contains("james"))
                        throw new IllegalArgumentException("no talking to james allowed");
                    // process message
                    return p;
                }, e -> e.advice(handleErrorAdvice()))
              .handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test")
                 .entityClass(Document.class)
                    .collectionCallback((c, m) -> 
                        c.findOneAndUpdate(Filters.eq("uuid", 
                                           ((Document) m.getPayload()).get("uuid")), 
                            Updates.combine(
                                Updates.set("status", ((Document) m.getPayload()).get("status")),
                                Updates.set("fails",  ((Document) m.getPayload()).get("fails" ))
                            )
                        )
                    )
                )
                .nullChannel();
    }

This updates the record within the same transaction and then saves it like I want. But I'm not sure if this is the right way of doing things. One odd thing about it that I don't really like is that I have to set the status to some intermediate value just for the sake of locking the record. Then I need the advice to set the status accordingly before the flow uses the outbound gateway to save it. I thought the error channel approach seemed more elegant but I'm not sure if it's possible to have the transaction boundary span both flows as long as the errorChannel is a DirectChannel.

Am I on the right track here or am I completely lost?

I should note that when testing the transactions, I had put some Thread.sleep() lines in there so I had time to try and steal the locked record. I removed it from the post just to make it cleaner.

EDIT: Latest iteration using RequestHandlerRetryAdvice combined with ExpressionEvaluatingRequestHandlerAdvice:

    @Bean
    public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
        return IntegrationFlow
                .from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
                    .collectionName("test").entityClass(Document.class)
                        .update(Update.update("status", "PROCESSING")), 
                        p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
                .split()
                .handle((GenericHandler<Document>) (p, h) -> {
                    if(p.get("message").toString().contains("james"))
                        throw new IllegalArgumentException("no talking to james allowed");
                    return p;
                }, e -> e.advice(statusAdvice(), retryAdvice()))
                .log( m -> "\n------AFTER ADVICE------\n" + m.getPayload())
                .handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test").entityClass(Document.class)
                    .collectionCallback((c, m) -> 
                        c.findOneAndUpdate(Filters.eq("uuid", ((Document) m.getPayload()).get("uuid")), 
                                            Updates.set("status", ((Document) m.getPayload()).get("status"))
                        )
                    )
                 )
                .nullChannel();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRetryTemplate(RetryTemplate.builder()
            .maxAttempts(3)
            .exponentialBackoff(1000L, 2, 10000)
            .build());
        return advice;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice statusAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setReturnFailureExpressionResult(true);
        advice.setOnSuccessExpression(new FunctionExpression<GenericMessage<Document>>(gm -> {
            System.out.println("============================== SUCCESS ADVICE ============================");
            return gm.getPayload().append("status", "DONE");
        }));
        advice.setOnFailureExpression(new FunctionExpression<GenericMessage<Document>>(gm -> {
            System.out.println("============================== FAILURE ADVICE ============================");
            return gm.getPayload().append("status", "FAILED");
        }));

        return advice;
    }

Solution

  • That's correct. The error handling in the Polling Endpoint is done after transaction is rolled back. The transaction interceptor is applied to the Callable<Message<?>> pollingTask as an AOP advice. This one is called from a pollForMessage() like this:

    private Runnable createPoller() {
        return () ->
                this.taskExecutor.execute(() -> {
                    int count = 0;
                    while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (this.maxMessagesPerPoll == 0) {
                            logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                            break;
                        }
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }
                });
    }
    

    The error handling is done as a decoration on tasks scheduled to that this.taskExecutor.

    Therefore to keep transaction and handle such a business error is not possible via errorChannel configuration.

    You may consider to use a RequestHandlerRetryAdvice on your handle() to retry those business errors. This way the original transaction will be held and no new messages are going to be pulled from MongoDB.

    You indeed might also need to combine it with the ExpressionEvaluatingRequestHandlerAdvice as a first interceptor to handle errors already after all retries.

    See more info in docs: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html