Search code examples
javaspring-integrationgoogle-cloud-pubsub

Spring Integration Error/Exception handling


I have started working with Spring Integration to send messages to external System using Spring Integration Google Pub/sub model.

I am sending the payload received by the Service activator as below

@ServiceActivator(inputChannel = "inputChannel")
              public void messageReceiver(final String payloadMessage) throws IOException {
    adapter.sendData(payloadMessage);  // send payloadMessage data to external system, add exception handlers
}

What I want is to implement Exception Handling to the adapter.sendData(payloadMessage) such that I would like to consider varies scenarios like

  1. External System down
  2. Network issues connecting to network from my system to external system.

I have been following the below Google cloud documentation and other online documentation but have not found sufficient usecase to handle the above scenarios https://cloud.google.com/pubsub/docs/spring#using-spring-integration-channel-adapters

Considering the above scenarios I would like to implement exception handling such a way that data is not lost when there are exceptions and external systems should receive data even if there are exceptions after some period of time.

I have configured the below error channel. Now there is any error in the sendData() method, I see the same failure messages keeping on loading in the eclipse console. Is there any need to add the param spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period in the yaml

 @Bean
public PubSubInboundChannelAdapter messageChannelAdapter(final @Qualifier("myInputChannel") MessageChannel inputChannel,
        PubSubTemplate pubSubTemplate) 
{
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, pubSubSubscriptionName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.AUTO_ACK);
    adapter.setErrorChannelName("pubsubErrors");
    return adapter;
}

@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message<MessagingException> exceptionMessage) {

    BasicAcknowledgeablePubsubMessage originalMessage = (BasicAcknowledgeablePubsubMessage) exceptionMessage
            .getPayload().getFailedMessage().getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);

    originalMessage.nack();
}

Solution

  • Sounds like you need some retry and backoff logic around your exceptions.

    See more info in docs: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#message-handler-advice-chain.

    The @ServiceActivator has that adviceChain attribute for your consideration.