Search code examples
spring-bootkotlingoogle-cloud-platformspring-integrationgoogle-cloud-pubsub

Too many messages are sent to the topic in Cloud PubSub [Spring Boot]


I'm using GCP and Cloud PubSub to develop a chatting server now.

I wanted to send a message to the topic in Cloud PubSub from an instance using Spring Integration, but it sent too many messages.

DEFAULT 2023-06-14T18:53:50.785528Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:50.785542Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
ERROR 2023-06-14T18:53:51.690026Z org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage], failedMessage=GenericMessage [payload={"roomId":1,"userId":1,"message":"zxcv"}, headers={replyChannel=nullChannel, errorChannel=, id=8e1e8628-36cf-ebd5-bd36-925668f6644c, timestamp=1686768830200}] at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:111) at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:108) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1075) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:558) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222) at org.springframework.integration.dispatcher.BroadcastingDispatcher.…
DEFAULT 2023-06-14T18:53:52.215535Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.215551Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355030Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.355046Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516490Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.516506Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662022Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.662037Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789312Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.789327Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991170Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:52.991186Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115730Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.115747Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241358Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.241373Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358250Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.358270Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504470Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.504486Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637282Z 2023-06-15T03:53:53.631+09:00 WARN 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.637335Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.637519Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.637531Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.649037Z 2023-06-15T03:53:53.648+09:00 INFO 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.786696Z 2023-06-15T03:53:53.784+09:00 WARN 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
ERROR 2023-06-14T18:53:53.786742Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal…
DEFAULT 2023-06-14T18:53:53.786965Z 2023-06-15T03:53:53.786+09:00 INFO 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
DEFAULT 2023-06-14T18:53:53.827465Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.827481Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973948Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:53.973962Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116426Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.116443Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237137Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
DEFAULT 2023-06-14T18:53:54.237158Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}

This is the code of the instance.

    // Create an inbound channel adapter to listen to the subscription `chat-sub` and send
    // messages to the input message channel.
    @Bean
    fun inboundChannelAdapter(
        @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
        pubSubTemplate: PubSubTemplate?
    ): PubSubInboundChannelAdapter? {
        val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
        adapter.outputChannel = messageChannel
        adapter.ackMode = AckMode.AUTO
        adapter.payloadType = String::class.java
        return adapter
    }

    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    fun messageReceiver(
        payload: String,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
    ) {
        val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
        
        messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
        message.ack()
        //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
    }

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `chat`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
        val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
        adapter.setSuccessCallback { ackId, message -> println("send success : " + message.payload) }
        adapter.setFailureCallback { cause, message -> println("send fail : " + message.payload) }
        return adapter
    }
@MessagingGateway(defaultRequestChannel = "inputMessageChannel")
interface PubSubOutBoundGateway {
    @kotlin.jvm.Throws(MessagingException::class)
    fun sendToPubsub(text : String)
}

So I received about 200 messages at my client side by shutting down the instance.

The first error is

org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage]

I thought it is the key of the message resending, so I tried to add GcpPubSubHeaders.ORIGINAL_MESSAGE header. But I could not find the class that implements BasicAcknowledgeablePubsubMessage to generate the object.

What am I missing now?

Or is there any way stopping retrying when exception occurs?


Solution

  • It is because the message sent to the topic goes back to inputMessageChannel, so messageSender adapter works again.

    So I separate inbound channel and outbound channel, so the code looks like this.

        @Bean
        fun inputMessageChannel(): MessageChannel? {
            return PublishSubscribeChannel()
        }
    
        @Bean
        fun outputMessageChannel(): MessageChannel? {
            return PublishSubscribeChannel()
        }
    
        // Create an inbound channel adapter to listen to the subscription `chat-sub` and send
        // messages to the input message channel.
        @Bean
        fun inboundChannelAdapter(
            @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
            pubSubTemplate: PubSubTemplate?
        ): PubSubInboundChannelAdapter? {
            val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
            adapter.outputChannel = messageChannel
            adapter.ackMode = AckMode.MANUAL
            adapter.payloadType = String::class.java
            return adapter
        }
    
        // Define what happens to the messages arriving in the message channel.
        @ServiceActivator(inputChannel = "inputMessageChannel")
        fun messageReceiver(
            payload: String,
            @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
        ) {
            val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
            println("received message : " + message.pubsubMessage.data.toStringUtf8())
            println(message.pubsubMessage.attributesMap)
            message.ack()
            messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
            //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
        }
    
        // Create an outbound channel adapter to send messages from the input message channel to the
        // topic `chat`.
        @Bean
        @ServiceActivator(inputChannel = "outputMessageChannel")
        fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
            val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
            adapter.setSuccessCallback { ackId, message -> println("send success : " + ackId + " - " + message.payload) }
            adapter.setFailureCallback { cause, message -> println("send fail : " + cause.stackTraceToString() + " - " + message.payload) }
            adapter.setPublishTimeoutExpressionString("500")
            return adapter
        }
    
    @MessagingGateway(defaultRequestTimeout = "500", defaultRequestChannel = "outputMessageChannel", errorChannel = "errorChannel")
    interface PubSubOutBoundGateway {
        @kotlin.jvm.Throws(MessagingException::class, MessageHandlingException::class)
        fun sendToPubsub(text : String)
    }
    

    Then it works.