I'm using GCP and Cloud PubSub to develop a chatting server now.
I wanted to send a message to the topic in Cloud PubSub from an instance using Spring Integration, but it sent too many messages.
DEFAULT 2023-06-14T18:53:50.785528Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:50.785542Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
ERROR 2023-06-14T18:53:51.690026Z org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage], failedMessage=GenericMessage [payload={"roomId":1,"userId":1,"message":"zxcv"}, headers={replyChannel=nullChannel, errorChannel=, id=8e1e8628-36cf-ebd5-bd36-925668f6644c, timestamp=1686768830200}] at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:111) at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:108) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1075) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:558) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222) at org.springframework.integration.dispatcher.BroadcastingDispatcher.…
DEFAULT 2023-06-14T18:53:52.215535Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.215551Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355030Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355046Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516490Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516506Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662022Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662037Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789312Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789327Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991170Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991186Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115730Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115747Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241358Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241373Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358250Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358270Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504470Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504486Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637282Z 2023-06-15T03:53:53.631+09:00 WARN 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.637335Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.637519Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637531Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.649037Z 2023-06-15T03:53:53.648+09:00 INFO 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.786696Z 2023-06-15T03:53:53.784+09:00 WARN 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.786742Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.786965Z 2023-06-15T03:53:53.786+09:00 INFO 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.827465Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.827481Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973948Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973962Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116426Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116443Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237137Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237158Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
This is the code of the instance.
// Create an inbound channel adapter to listen to the subscription `chat-sub` and send
// messages to the input message channel.
@Bean
fun inboundChannelAdapter(
@Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
pubSubTemplate: PubSubTemplate?
): PubSubInboundChannelAdapter? {
val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
adapter.outputChannel = messageChannel
adapter.ackMode = AckMode.AUTO
adapter.payloadType = String::class.java
return adapter
}
// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
fun messageReceiver(
payload: String,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
) {
val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
message.ack()
//LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
}
// Create an outbound channel adapter to send messages from the input message channel to the
// topic `chat`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
adapter.setSuccessCallback { ackId, message -> println("send success : " + message.payload) }
adapter.setFailureCallback { cause, message -> println("send fail : " + message.payload) }
return adapter
}
@MessagingGateway(defaultRequestChannel = "inputMessageChannel")
interface PubSubOutBoundGateway {
@kotlin.jvm.Throws(MessagingException::class)
fun sendToPubsub(text : String)
}
So I received about 200 messages at my client side by shutting down the instance.
The first error is
org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage]
I thought it is the key of the message resending, so I tried to add GcpPubSubHeaders.ORIGINAL_MESSAGE header. But I could not find the class that implements BasicAcknowledgeablePubsubMessage to generate the object.
What am I missing now?
Or is there any way stopping retrying when exception occurs?
It is because the message sent to the topic goes back to inputMessageChannel, so messageSender adapter works again.
So I separate inbound channel and outbound channel, so the code looks like this.
@Bean
fun inputMessageChannel(): MessageChannel? {
return PublishSubscribeChannel()
}
@Bean
fun outputMessageChannel(): MessageChannel? {
return PublishSubscribeChannel()
}
// Create an inbound channel adapter to listen to the subscription `chat-sub` and send
// messages to the input message channel.
@Bean
fun inboundChannelAdapter(
@Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
pubSubTemplate: PubSubTemplate?
): PubSubInboundChannelAdapter? {
val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
adapter.outputChannel = messageChannel
adapter.ackMode = AckMode.MANUAL
adapter.payloadType = String::class.java
return adapter
}
// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
fun messageReceiver(
payload: String,
@Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
) {
val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
println("received message : " + message.pubsubMessage.data.toStringUtf8())
println(message.pubsubMessage.attributesMap)
message.ack()
messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
//LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
}
// Create an outbound channel adapter to send messages from the input message channel to the
// topic `chat`.
@Bean
@ServiceActivator(inputChannel = "outputMessageChannel")
fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
adapter.setSuccessCallback { ackId, message -> println("send success : " + ackId + " - " + message.payload) }
adapter.setFailureCallback { cause, message -> println("send fail : " + cause.stackTraceToString() + " - " + message.payload) }
adapter.setPublishTimeoutExpressionString("500")
return adapter
}
@MessagingGateway(defaultRequestTimeout = "500", defaultRequestChannel = "outputMessageChannel", errorChannel = "errorChannel")
interface PubSubOutBoundGateway {
@kotlin.jvm.Throws(MessagingException::class, MessageHandlingException::class)
fun sendToPubsub(text : String)
}
Then it works.