Search code examples
spring-integration

QueueChannel using JdbcChannelMessageStore does not show error when serialization fails


I just spend a few hours debugging why my Spring Integration flow was not working. The flow uses a QueueChannel with a JdbcChannelMessageStore to implement an transactional outbox. However, the message was never stored in the database. If I replaced the queue with an in-memory one (Using MessageChannels.queue("test-queue")), then everything worked fine.

When I enable DEBUG logging, I only saw preSend on channel, never postSend. There was also no error or stacktrace in the console.

In the end, I put a breakpoint on ChannelMessageStorePreparedStatementSetter.setValues(). That function has the following code:

        if (this.serializer != null) {
            byte[] messageBytes = this.serializer.convert(requestMessage);
            this.lobHandler.getLobCreator().setBlobAsBytes(preparedStatement, 6, messageBytes); // NOSONAR magic number
        }

Using the debugger, I could access the exception that was thrown from the this.serializer.convert(requestMessage) that indicated that my message was not fully Serializable. I can fix it now by making it fully serializable, but I would like to know if Spring Integration can log a warning at least when this is the case?

EDIT:

This is my full config:

@Configuration
public class JmsConfiguration {

  @Bean
  public DirectChannel jmsInput() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow jmsOutboundFlow(
      JdbcChannelMessageStore jdbcChannelMessageStore,
      JmsMessenger jmsMessenger,
      JmsProperties properties,
      @Qualifier("springIntegrationTransactionInterceptor") TransactionInterceptor transactionInterceptor
  ) {
    return IntegrationFlow.from(this.jmsInput())
        .channel(this.jmsOutbox(jdbcChannelMessageStore))
        .handle(message -> {
          JmsMessage jmsMessage = (JmsMessage) message.getPayload();
          jmsMessenger.sendMessage(jmsMessage);
        }, e -> e.poller(Pollers.fixedDelay(properties.outboxPollingDuration())
            .transactional(transactionInterceptor)))
        .get();
  }

  @Bean
  public QueueChannel jmsOutbox(JdbcChannelMessageStore jdbcChannelMessageStore) {
    return MessageChannels.queue(jdbcChannelMessageStore, "jms-outbox").getObject();
  }

}

And my gateway that triggers the flow:

@MessagingGateway
public interface JmsGateway {

  String JMS_INPUT_CHANNEL = "jmsInput";

  @Gateway(requestChannel = JMS_INPUT_CHANNEL)
  void sendMessage(JmsMessage message);
}

JmsMessage and JmsMessenger are my own objects to represent a message that should be sent over JMS and the thing that actually uses JmsTemplate to send the message.


Solution

  • The serialization exception is thrown from that logic:

    @Test
    void notSerializableMessage() {
        assertThatExceptionOfType(SerializationFailedException.class)
                .isThrownBy(() -> this.messageStore.addMessageToGroup("some_group",
                        new GenericMessage<>(new CountDownLatch(1))))
                .withStackTraceContaining("Caused by: java.io.NotSerializableException: java.util.concurrent.CountDownLatch");
    }
    

    This addMessageToGroup() is called from the MessageGroupQueue.offer(), which, in turn, from the QueueChannel.doSend(). Therefore an exception is bubbled up to the producer for this channel.

    If you don't see any errors, then we need to look into your producer. Perhaps you send to this queue channel from the source polling channel adapter which comes with a default ErrorChannel and LoggingHandler as subscriber.