Search code examples
gomessaginggoroutine

Swap message after sometime in watermill Golang


I am using Watermill to develop software where I send a message, and it goes through service1, service2, and the last service. I use a slice to control the order of the messages (FIFO, as GoChannel should respect FIFO). After several runs, I am encountering an issue where Watermill is swapping messages. For example, I send message A and message B, but in the last service, message B arrives first and then message A. Attached is a small script where this problem is reflected. It seems to be a race condition because it doesn't always happen, but when the script is run and the issue occurs, it shows a message like this: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6-45a4-bda6-07e021511d3f.

/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice

// Safe Slice struct just for control of the messages
type SafeSlice struct {
    mu    sync.Mutex
    slice []string
}

func (s *SafeSlice) Append(value string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.slice = append(s.slice, value)
}

func (s *SafeSlice) Get(index int) (string, bool) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return "Index out of scope", false
    }
    return s.slice[index], true
}

func (s *SafeSlice) Remove(index int) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return false
    }
    s.slice = append(s.slice[:index], s.slice[index+1:]...)
    return true
}

// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
    safeSlice.Append(msg.UUID)
    err := pubSub1.Publish("service-2-input", msg)
    if err != nil {
        panic(err)
    }

    return nil
}

// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
    fmt.Printf("Message in service 2 %v\n", msg)

    // Add some logic

    return message.Messages{msg}, nil
}

// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
    uuid, _ := safeSlice.Get(0)

    fmt.Printf("service_last %v\n", msg)

    if msg.UUID == uuid {
        fmt.Println("OK")
        safeSlice.Remove(0)
    } else {
        fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
        os.Exit(0)
    }

    return nil
}

func main() {

    logger := watermill.NewStdLogger(true, true)
    safeSlice = &SafeSlice{}

    pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        log.Fatalf("could not create router: %v", err)
    }

    // Create handlers for each service
    router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
    router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
    router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)

    // Start the router
    go func() {
        if err := router.Run(context.Background()); err != nil {
            log.Fatalf("could not run router: %v", err)
        }
    }()

    time.Sleep(1 * time.Second)

    for {
        // Publish a message to start the pipeline
        msg := message.NewMessage(watermill.NewUUID(), []byte{})
        if err := pubSub1.Publish("service-1-input", msg); err != nil {
            log.Fatalf("could not publish message: %v", err)
        }

        //time.Sleep(1000 * time.Millisecond)
    }

    // Allow some time for the message to be processed
    select {}
}

Solution

  • You create a GoChannel with default configuration, especially BlockPublishUntilSubscriberAck = false.

        pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)
    

    Now you are sending a couple of messages, in quick succession.

        for {
            msg := message.NewMessage(watermill.NewUUID(), []byte{})
            if err := pubSub1.Publish("service-1-input", msg); err != nil {
                log.Fatalf("could not publish message: %v", err)
            }
        }
    

    The subscription starts multiple goroutines running service1, each appending a message to a global variable safeSlice before republishing the message.

    func service1(msg *message.Message) error {
        safeSlice.Append(msg.UUID)
        return pubSub1.Publish("service-2-input", msg)
    }
    

    It is absolutely possible that one goroutine appends a first message to safeSlice, then the other goroutine appends a second message to safeSlice and republishes it before the first message will get republished. Since both safeSlice.Append and pubSub1.Publish use locks, you don't have to wait very long for that.

    Depending on order in event base systems is really hard, and in 99.9% of use cases completely unnecessary. Also, using mutable globale variables in asynchronous systems can be very tricky.