Search code examples
google-cloud-platformcloudpublish-subscribegoogle-cloud-pubsub

How to check if the topic-queue is empty and then terminate the subscriber?


In my business application I have to batch-process all the messages from a topic periodically because it is cheaper than processing them in a first-come-first-serve fashion. The current way I am planning to do it is have a cronjob that runs the subscriber every T hours. The problem that I am currently solving is how to terminate the subscriber once all the messages have been processed. I want to fire up the cronjob every T hours, let the subscriber consume all the messages in the topic-queue and terminate. From what I understand, there is no pub-sub Java API that tells me whether the topic-queue is empty or not. I have come up with the following 2 solutions:

  1. Create a subscriber that pulls asynchronously. Sleep for t minutes while it consumes all the messages and then terminate it using subscriber.stopAsync().awaitTerminated();. In this approach, there is a possibility I might not consume all the messages before terminating the subscriber. A google example here

  2. Use Pub/Sub Cloud monitoring to find the value of the metric subscription/num_undelivered_messages. Then pull that many messages using the synchronous pull example provided by Google here. Then terminate the Subscriber.

Is there a better way to do this?

Thanks!


Solution

  • I have done this same implementation in Go some month ago. My assumption was the following:

    • If there is messages in the queue, the app consume them very quickly (less than 100ms between 2 messages).
    • If the queue is empty (my app has finished to consume all the messages), new messages can come but slower than 100ms

    Thereby, I implement this: * Each time that I received a message, * I suspend the 100ms timeout * I process and ack the message * I reset to 0 the 100ms timeout * If the 100ms timeout is fired, I terminate my pull subscription

    In my use case, I schedule my processing each 10 minutes. So, I set a global timeout at 9m30 to finish the processing and let the new app instance to continue the processing

    Just a tricky thing: For the 1st message, set the timeout to 2s. Indeed, the first message message takes longer to come because of connexion establishment. Thus set a flag when you init your timeout "is the first message or not".

    I can share my Go code if it can help you for your implementation.

    EDIT

    Here my Go code about the message handling

    func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
        ctx := context.Background()
        cctx, cancel := context.WithCancel(ctx)
    
        // Connect to PubSub
        client, err := pubsub.NewClient(cctx, pubSubService.projectId)
        if err != nil {
            log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
        }
    
        // Put all the message in a array. It will be processed at the end (stored to BQ, as is)
        msgArray = []*pubsub.Message{}
    
        // Channel to receive messages
        var receivedMessage = make(chan *pubsub.Message)
    
        // Handler to receive message (through the channel) or cancel the the context if the timeout is reached
        go func() {
            //Initial timeout because the first received is longer than this others.
            timeOut := time.Duration(3000)
            for {
                select {
                case msg := <-receivedMessage:
                    //After the first receive, the timeout is changed
                    timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
                    msgArray = append(msgArray, msg)
                case <-time.After(timeOut * time.Millisecond):
                    log.Debug("Cancel by timeout")
                    cancel()
                    return
                }
            }
        }()
    
        // Global timeout
        go func(){
            timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
            time.Sleep(timeOut * time.Second):
            log.Debug("Cancel by global timeout")
            cancel()
            return
        }
    
        // Connect to the subscription and pull it util the channel is canceled
        sub := client.Subscription(pubSubService.subscriptionName)
        err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
            receivedMessage <- msg
            msg.Ack()
        })
    }