In my business application I have to batch-process all the messages from a topic periodically because it is cheaper than processing them in a first-come-first-serve fashion. The current way I am planning to do it is have a cronjob
that runs the subscriber every T
hours. The problem that I am currently solving is how to terminate the subscriber once all the messages have been processed. I want to fire up the cronjob
every T
hours, let the subscriber consume all the messages in the topic-queue and terminate. From what I understand, there is no pub-sub
Java API that tells me whether the topic-queue is empty or not. I have come up with the following 2 solutions:
Create a subscriber that pulls asynchronously. Sleep for t minutes
while it consumes all the messages and then terminate it using subscriber.stopAsync().awaitTerminated();
. In this approach, there is a possibility I might not consume all the messages before terminating the subscriber. A google example here
Use Pub/Sub Cloud monitoring
to find the value of the metric subscription/num_undelivered_messages
. Then pull that many messages using the synchronous pull example provided by Google here. Then terminate the Subscriber.
Is there a better way to do this?
Thanks!
I have done this same implementation in Go some month ago. My assumption was the following:
Thereby, I implement this: * Each time that I received a message, * I suspend the 100ms timeout * I process and ack the message * I reset to 0 the 100ms timeout * If the 100ms timeout is fired, I terminate my pull subscription
In my use case, I schedule my processing each 10 minutes. So, I set a global timeout at 9m30 to finish the processing and let the new app instance to continue the processing
Just a tricky thing: For the 1st message, set the timeout to 2s. Indeed, the first message message takes longer to come because of connexion establishment. Thus set a flag when you init your timeout "is the first message or not".
I can share my Go code if it can help you for your implementation.
EDIT
Here my Go code about the message handling
func (pubSubService *pubSubService) Received() (msgArray []*pubsub.Message, err error) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)
// Connect to PubSub
client, err := pubsub.NewClient(cctx, pubSubService.projectId)
if err != nil {
log.Fatalf("Impossible to connect to pubsub client for project %s", pubSubService.projectId)
}
// Put all the message in a array. It will be processed at the end (stored to BQ, as is)
msgArray = []*pubsub.Message{}
// Channel to receive messages
var receivedMessage = make(chan *pubsub.Message)
// Handler to receive message (through the channel) or cancel the the context if the timeout is reached
go func() {
//Initial timeout because the first received is longer than this others.
timeOut := time.Duration(3000)
for {
select {
case msg := <-receivedMessage:
//After the first receive, the timeout is changed
timeOut = pubSubService.waitTimeOutInMillis // Environment variable = 200
msgArray = append(msgArray, msg)
case <-time.After(timeOut * time.Millisecond):
log.Debug("Cancel by timeout")
cancel()
return
}
}
}()
// Global timeout
go func(){
timeOut = pubSubService.globalWaitTimeOutInMillis // Environment variable = 750
time.Sleep(timeOut * time.Second):
log.Debug("Cancel by global timeout")
cancel()
return
}
// Connect to the subscription and pull it util the channel is canceled
sub := client.Subscription(pubSubService.subscriptionName)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
receivedMessage <- msg
msg.Ack()
})
}