Search code examples
gopublish-subscribegoogle-cloud-pubsub

Google Pub/Sub message ordering not working (or increasing latency to over 10 seconds)?


I'm trying to make a simplified example demonstrating the use of Google Pub/Sub's message ordering feature (https://cloud.google.com/pubsub/docs/ordering). From those docs, after message ordering is enabled for a subscription,

After the message ordering property is set, the Pub/Sub service delivers messages with the same ordering key in the order that the Pub/Sub service receives the messages. For example, if a publisher sends two messages with the same ordering key, the Pub/Sub service delivers the oldest message first.

I've used this to write the following example:

package main

import (
    "context"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
    uuid "github.com/satori/go.uuid"
)

func main() {
    client, err := pubsub.NewClient(context.Background(), "my-project")
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topicID := "test-topic-" + uuid.NewV4().String()
    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    defer topic.Delete(context.Background())

    subID := "test-subscription-" + uuid.NewV4().String()
    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
        Topic:                 topic,
        EnableMessageOrdering: true,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    defer sub.Delete(context.Background())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    messageReceived := make(chan struct{})
    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
        msg.Ack()
        messageReceived <- struct{}{}
    })

    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

    for i := 0; i < 2; i++ {
        select {
        case <-messageReceived:
        case <-time.After(10 * time.Second):
            log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
        }
    }
}

First, I tried the program without specifying OrderingKey: "foobar" in the topic.Publish() calls. This resulted in the following output:

> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!

In other words, messages are not received in the same order as they were published in, which in my use case is undesirable and I'd like to prevent by specifying an OrderingKey

However, as soon as I added the OrderingKeys in the publish calls, the program times out after 10 seconds of waiting to receive Pub/Sub messages:

> go run main.go
2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.
exit status 1

What I would expect is to now first receive the message Dang1! followed by Dang2!, but instead I'm not receiving any messages. Any idea why this is not happening?


Solution

  • The publishes are failing with the following error: Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering.

    You can see this if you change your publish calls to check the error:

    res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
    res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})
    
    _, err = res1.Get(ctx)
    if err != nil {
        fmt.Printf("Failed to publish: %v", err)
        return
    }
    
    _, err = res2.Get(ctx)
    if err != nil {
        fmt.Printf("Failed to publish: %v", err)
        return
    }
    

    To fix it, add a line to enable message ordering on your topic. Your topic creation would be as follows:

    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    topic.EnableMessageOrdering = true
    defer topic.Delete(context.Background())