I have started working with Spring Integration to send messages to external System using Spring Integration Google Pub/sub model.
I am sending the payload received by the Service activator as below
@ServiceActivator(inputChannel = "inputChannel")
public void messageReceiver(final String payloadMessage) throws IOException {
adapter.sendData(payloadMessage); // send payloadMessage data to external system, add exception handlers
}
What I want is to implement Exception Handling to the adapter.sendData(payloadMessage) such that I would like to consider varies scenarios like
I have been following the below Google cloud documentation and other online documentation but have not found sufficient usecase to handle the above scenarios https://cloud.google.com/pubsub/docs/spring#using-spring-integration-channel-adapters
Considering the above scenarios I would like to implement exception handling such a way that data is not lost when there are exceptions and external systems should receive data even if there are exceptions after some period of time.
I have configured the below error channel. Now there is any error in the sendData() method, I see the same failure messages keeping on loading in the eclipse console. Is there any need to add the param spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period
in the yaml
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(final @Qualifier("myInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate)
{
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, pubSubSubscriptionName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.AUTO_ACK);
adapter.setErrorChannelName("pubsubErrors");
return adapter;
}
@ServiceActivator(inputChannel = "pubsubErrors")
public void pubsubErrorHandler(Message<MessagingException> exceptionMessage) {
BasicAcknowledgeablePubsubMessage originalMessage = (BasicAcknowledgeablePubsubMessage) exceptionMessage
.getPayload().getFailedMessage().getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
originalMessage.nack();
}
Sounds like you need some retry and backoff logic around your exceptions.
See more info in docs: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#message-handler-advice-chain.
The @ServiceActivator
has that adviceChain
attribute for your consideration.