Search code examples
javaspring-bootspring-integration

How to reply to a gateway from an error channel so it does not hang


I've been introduced to Spring Integration via Josh Long YouTube videos and I thought it would be a good fit for one of our business case.

What we want to do is simple on paper: data replication. We have a custom message store whose data is polled regularly and we want to save the payloads (which are entities in json format) into our tables.

Our first flow polls the data, group the data by entity type, and dispatch to different subflows, again, by type.

IntegrationFlows
    .from(jdbcPollingChannelAdapter(queueName), p -> p.poller(m -> m.fixedDelay(250, TimeUnit.MILLISECONDS)))
    .split()
    .enrichHeaders(e -> e
        .headerExpression("type", "payload.type")
        .headerExpression("messageStoreId", "payload.id")
    )
    .aggregate(a -> a.outputProcessor(messageGroupProcessor))
    .gateway(c -> c.route("headers['type']"), /*c -> c.replyChannel("gatewayReplyChannel")*/)
    .log(Level.INFO, "After route...", "'-'")
    .get();

For each of our entities we have a flow that will do an upsert of the data that will be batched, and if it's successful, it will update the message store to inform the rows have been "processed".

public static IntegrationFlow batch(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
    return IntegrationFlows
        .from(entityType.name())
        .enrichHeaders(e -> e.header("errorChannel", "batchInsertErrorChannel"))
        .split()
        .enrichHeaders(e -> e.headerExpression("messageStoreId", "payload.id", true))
        .transform(CommandMessageStore::payload)
        .transform(Transformers.fromJson(entityType.getClazz()))
        .handle((GenericHandler<? extends MessageDTO>) (payload, headers) -> {
            payload.setMessageStoreId(headers.get("messageStoreId", Long.class));
            return payload;
        })
        .aggregate()
        .handle(upsertOutboundGateway)
        .enrichHeaders(e -> e.header("status", "PROCESSED"))
        .handle((message, headers) -> headers.get("messageStoreIds", List.class))
        .handle(updateMessageStoreOutboundGateway(dataSource))
        .get();
}

public static JdbcOutboundGateway updateMessageStoreOutboundGateway(DataSource dataSource) {
    var jdbc = new JdbcOutboundGateway(dataSource,
        "update message_store set status = ?, error_message = ?, processed_date = now()::timestamp where id = ?");
    jdbc.setRequestPreparedStatementSetter((ps, message) -> {
        ps.setString(1, message.getHeaders().get("status", String.class));
        ps.setString(2, message.getHeaders().get("errorMessage", String.class));
        ps.setLong(3, (Long) message.getPayload());
    });
    return jdbc;
}

This works really nicely in the "happy path".

The thing is that sometimes, the data we upsert is not that good, there might be a foreign key integrity issue. If this is the case, what we want to do is to read each row one by one and upsert them. Of course the one with the foreign key error will fail again but at least the other rows will be inserted. The message store will be updated with "PROCESSED" status for the rows upserted successfully and "ERROR" for the others.

So the scenario is to go to an error channel when the foreign key constraint arises, and execute single insert flows (again, for each entity). It looks like this:

@Bean
IntegrationFlow batchInsertErrorFlow() {
    return IntegrationFlows
        .from("batchInsertErrorChannel")
        .enrichHeaders(h -> h
            .headerExpression("type", "payload.failedMessage.headers['type']")
            //.replyChannelExpression("payload.cause.failedMessage.headers['replyChannel']") 
            //--> error: Reply message received but the receiving thread has exited due to an exception while sending the request message
            //.replyChannel("gatewayReplyChannel") --> error: StackOverflow
        )
        .transform("payload.cause.failedMessage.payload")
        .gateway(c -> c.route("headers['type'] + '_SINGLE'"))
        .get();
}

public static StandardIntegrationFlow single(EntityType entityType, JdbcOutboundGateway upsertOutboundGateway, DataSource dataSource) {
    return IntegrationFlows
        .from(entityType.name() + "_SINGLE")
        .split()
        .enrichHeaders(e -> e
            .headerExpression("messageStoreIds", "T(java.util.List).of(payload.messageStoreId)")
        )
        .handle(upsertOutboundGateway, e -> e.advice(singleInsertExpressionAdvice)) // I have to use an advice here, enriching the headers do not work
        .enrichHeaders(e -> e.header("status", "PROCESSED"))
        .handle((message, headers) -> headers.get("messageStoreIds", List.class))
        .handle(updateMessageStoreOutboundGateway(dataSource))
        .get();
}

@Bean
public Advice singleInsertExpressionAdvice() {
    var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setFailureChannelName("singleInsertErrorChannel");
    advice.setOnFailureExpressionString("headers['messageStoreIds']");
    advice.setTrapException(true);
    return advice;
}

@Bean
IntegrationFlow singleInsertErrorFlow(JdbcMessageHandler processFinishedHandler, DataSource dataSource) {
    return IntegrationFlows
        .from("singleInsertErrorChannel")
        .log(Level.INFO, "singleInsertErrorChannel-1", "'-'")
        .enrichHeaders(e -> e
            .headerExpression("errorMessage", "payload.cause.message")
            .header("status", "ERROR")
        )
        .transform("payload.evaluationResult")
        .handle(processFinishedHandler) // does the same thing as updateMessageStoreOutboundGateway()
        .get();
}

The problem is each time it goes to the error channel, I don't know of a way to answer the gateway of the initial flow, and so it hangs. I know it because the "After root..." log is never displayed in this case. So data is processed successfully, but there is no more polling.

I have tried many things, some are commented in the code.

What do I miss so I could reply to the gateway when the error flow is done ?


UPDATE (SOLVED)

Artem pointed out different flaws in my flows. This is what I changed:

  1. In the batch flow, I stopped setting the error channel via headers and used an advice instead (like in the single flow)
  2. In the batchInsertErrorFlow, I added the .replyChannelExpression("payload.failedMessage.headers['replyChannel']")
  3. Same thing for the singleInsertErrorFlow
  4. Still in the singleInsertErrorFlow, I stopped using the JdbcMessageHandler and used a JdbcOutboundGateway, like in all the other flows.

After all this, when an error occured in the upsert, I was able to see my log "After route" in my first flow, and of course, resume the polling of data.

Note: as Artem mentionned in a comment reply, my Spring Integration version (5.5.20 embedded with Spring Boot 2.7.18) has a bug with the log() statement at the end of a flow. When I removed it, I had an error, "no output-channel or reply channel". I finished my flow with the .nullChannel() statement instead.


Solution

  • Here is rudimentary unit test to demonstrate your use-case and how to deal with errors:

    @SpringJUnitConfig
    public class So77955301Tests {
        
        @Autowired
        Function<String, String> flowGateway;
        
        @Test
        void errorFlowRepliesToGateway() {
            assertThat(this.flowGateway.apply("test")).isEqualTo("error flow reply");
        }
        
        @Configuration
        @EnableIntegration
        static class TestConfiguration {
    
            @Bean
            IntegrationFlow flowWithErrorReply(ExpressionEvaluatingRequestHandlerAdvice singleInsertExpressionAdvice) {
                return IntegrationFlow.from(Function.class)
                        .<Object>handle((p, h) -> {
                            throw new RuntimeException("intentional");
                        }, e -> e.advice(singleInsertExpressionAdvice))
                        .get();
            }
    
            @Bean
            public ExpressionEvaluatingRequestHandlerAdvice singleInsertExpressionAdvice() {
                var advice = new ExpressionEvaluatingRequestHandlerAdvice();
                advice.setFailureChannelName("singleInsertErrorFlow.input");
                advice.setTrapException(true);
                return advice;
            }
    
            @Bean
            IntegrationFlow singleInsertErrorFlow() {
                return f -> f
                    .enrichHeaders(e -> e.replyChannelExpression("payload.failedMessage.headers[replyChannel]"))
                    .transform(payload -> "error flow reply");
        }
            
        }
    
    }
    

    The crucial part is replyChannelExpression("payload.failedMessage.headers[replyChannel]").

    The ErrorMessage sent to the error channel from that ExpressionEvaluatingRequestHandlerAdvice does not have all the required headers for replies. Therefore we need to extract them from the failedMessage.

    Also your .handle(processFinishedHandler) is probably one-way since you say that the gateway is hanging. So, to be able to produce a reply back to the gateway (I believe that is .gateway(c -> c.route("headers['type'] + '_SINGLE'"))), you need to make your singleInsertErrorFlow as relying one and deal with those headers from the failed message.

    I think we can make it automatic in the framework enriching that:

            MessagingException messagingException =
                    new MessageHandlingExpressionEvaluatingAdviceException(message, "Handler Failed",
                            unwrapThrowableIfNecessary(exception), evalResult);
            ErrorMessage errorMessage = new ErrorMessage(messagingException);
            this.messagingTemplate.send(this.failureChannel, errorMessage);
    

    With the headers from message we have just failed, but that would make it only into the current 6.3 release.