I have a message consumer (eg: Kafka) that runs on a go routine with a for-select with a default case where it processes the received message:
type Consumer struct{}
func (c *Consumer) Start(ctx context.Context) {
fmt.Println("Consumer is starting")
defer func() {
fmt.Println("Consumer is stopping")
}()
i := 0
for {
select {
case <-ctx.Done():
fmt.Println("Context is done")
return
default:
// msg, err := kafka.FetchMessage(ctx)
fmt.Printf("[%d] Received message\n", i)
<-time.After(3 * time.Second)
fmt.Printf("[%d] Stopped processing\n", i)
i++
}
}
}
In my main function, I have a channel that listen for cancelation signals. Whenever the cancel signal is received, I want to cancel the go routine by calling the context cancel function.
func main() {
fmt.Println("Main Start")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumer := &Consumer{}
go func() {
consumer.Start(ctx)
}()
<-sigchan
fmt.Println("Received signal to shutdown")
cancel()
}
The requisites of the system are:
However, when I cancel the program by clicking CTRL + C, if a message is in processing the processing is not completed. How can I implement a feature that will cancel the consumer only after the last message is processed.
Go Playgroud: https://go.dev/play/p/kRUPPhwxwn7
The reason it exits immediately is because when main()
returns, all goroutines abruptly stopped. There's nothing waiting until consumer.Start
is finished.
Using signal.NotifyContext
is a bit more straight-forward, but we can accomplish it your way too.
// Your way, modified:
func main() {
fmt.Println("Main Start")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-sigchan
fmt.Println("Received signal to shutdown")
cancel()
}()
// Block until it exists, cancel in the background
consumer := &Consumer{}
consumer.Start(ctx)
}
// Allowing signal.NotifyContext to deal with it
func main() {
fmt.Println("Main Start")
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
consumer := &Consumer{}
consumer.Start(ctx)
fmt.Println("Shutting down")
}
Also, note that CTRL+C
is not exactly equivalent to kill -INT $PID
. The former will send SIGINT
to every process in the foreground proc's PGID
. This means forks may get killed before the main process can do it gracefully.