Search code examples
gogoroutine

Properly close a Go routine which runs an infinite loop


I have a go routine which is basically acting as a KafkaConsumer, it reads messages from a topic and then spawns another go routine for each message it receives. Now this Consumer go routine is supposed to be shutdown when the application which is the main go routine shuts down. But I am facing difficulties in properly shutting this down. Below is the Kafka Consumer definition

    package svc    

import (
    "event-service/pkg/pb"
    "fmt"
    "github.com/gogo/protobuf/proto"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "log"
    "os"
    "sync"
)    

type EventConsumer func(event eventService.Event)    

type KafkaConsumer struct {
    done            chan bool
    eventChannels   []string
    consumer        *kafka.Consumer
    consumerMapping map[string]EventConsumer
    wg              *sync.WaitGroup
}    

func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
    configMap := &kafka.ConfigMap{}
    for key, value := range config {
        err := configMap.SetKey(key, value)
        if err != nil {
            log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
        }
    }
    return configMap
}    

func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
    var wg sync.WaitGroup
    consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
    done := make(chan bool, 1)
    if err != nil {
        log.Fatalf("An error %v occurred while starting kafka consumer.", err)
    }
    err = consumer.SubscribeTopics(channels, nil)
    if err != nil {
        log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
    }
    return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}    

func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
    event := eventService.Event{}
    err := proto.Unmarshal(eventData, &event)
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
    }
    return &event
}    

func (kc *KafkaConsumer) Consume() {
    go func() {
        run := true
        for run == true {
            select {
            case sig := <-kc.done:
                log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
                run = false
                return
            default:
            }
            e := <-kc.consumer.Events()
            switch event := e.(type) {
            case kafka.AssignedPartitions:
                _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
                err := kc.consumer.Assign(event.Partitions)
                if err != nil {
                    log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
                }
            case kafka.RevokedPartitions:
                _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
                err := kc.consumer.Unassign()
                if err != nil {
                    log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
                }
            case *kafka.Message:
                domainEvent := kc.getEvent(event.Value)
                kc.wg.Add(1)
                go func(event *eventService.Event) {
                    defer kc.wg.Done()
                    if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
                        eventConsumer(*domainEvent)
                    } else {
                        log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
                    }
                }(domainEvent)
            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                _, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            }
        }
    }()
}    

func (kc *KafkaConsumer) Close() {
    log.Println("Waiting")
    kc.wg.Wait()
    kc.done <- true
    log.Println("Done waiting")
    err := kc.consumer.Close()
    if err != nil {
        log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
    }
}

And below is the main thread code

    package main    

import (
    "event-service/pkg/pb"
    "event-service/pkg/svc"
    "fmt"
    "log"
)    

func main() {
    eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
        log.Println(fmt.Sprintf("Got event %v from kafka", event))
    }}
    consumerConfig := map[string]interface{}{
        "bootstrap.servers":               "localhost:9092",
        "group.id":                        "catalog",
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "enable.partition.eof":            true,
        "auto.offset.reset":               "earliest",
    }
    kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
    kafkaConsumer.Consume()
    kafkaConsumer.Close()
}

The problem here is that the application is not ending at all sometimes and its not executing the consume function in some runs, what am i missing here ?


Solution

  • Alright, here goes the solution, 1. Since the consumer go routine should live as long as the main go routine is alive and the main go routine is also an endless go routine, closing the consumer go routine while the go routine is running, isn't the correct approach.

    So the following solution works

    package main    
    
    import (
        "event-service/pkg/pb"
        "event-service/pkg/svc"
        "fmt"
        "log"
        "sync"
    )    
    
    func main() {
        eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
            log.Println(fmt.Sprintf("Got event %v from kafka", event))
        }}
        consumerConfig := map[string]interface{}{
            "bootstrap.servers":               "localhost:9092",
            "group.id":                        "catalog-2",
            "session.timeout.ms":              6000,
            "go.events.channel.enable":        true,
            "go.application.rebalance.enable": true,
            "enable.partition.eof":            true,
            "auto.offset.reset":               "earliest",
        }
        var wg sync.WaitGroup
        kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
        kafkaConsumer.Consume(&wg)
        wg.Wait()
        kafkaConsumer.Close()
    }
    

    The service definition

    package svc    
    
    import (
        "event-service/pkg/pb"
        "fmt"
        "github.com/gogo/protobuf/proto"
        "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
        "log"
        "os"
        "os/signal"
        "sync"
        "syscall"
    )    
    
    type EventConsumer func(event eventService.Event)    
    
    type KafkaConsumer struct {
        done            chan bool
        consumer        *kafka.Consumer
        consumerMapping map[string]EventConsumer
        sigChan         chan os.Signal
        channels        []string
    }    
    
    func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
        configMap := &kafka.ConfigMap{}
        for key, value := range config {
            err := configMap.SetKey(key, value)
            if err != nil {
                log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
            }
        }
        return configMap
    }    
    
    func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
        sigChan := make(chan os.Signal, 1)
        consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
        done := make(chan bool, 1)
        if err != nil {
            log.Fatalf("An error %v occurred while starting kafka consumer.", err)
        }
        err = consumer.SubscribeTopics(channels, nil)
        if err != nil {
            log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
        }
        return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
    }    
    
    func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
        event := eventService.Event{}
        err := proto.Unmarshal(eventData, &event)
        if err != nil {
            log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
        }
        return &event
    }    
    
    func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
        signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
        wg.Add(1)
        go func() {
            run := true
            defer wg.Done()
            for run == true {
                select {
                case sig := <-kc.sigChan:
                    fmt.Printf("Caught signal %v: terminating\n", sig)
                    run = false
                case ev := <-kc.consumer.Events():
                    switch e := ev.(type) {
                    case kafka.AssignedPartitions:
                        _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
                        _ = kc.consumer.Assign(e.Partitions)
                    case kafka.RevokedPartitions:
                        _, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
                        _ = kc.consumer.Unassign()
                    case *kafka.Message:
                        domainEvent := kc.getEvent(e.Value)
                        wg.Add(1)
                        go func(event *eventService.Event) {
                            defer wg.Done()
                            if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
                                eventConsumer(*domainEvent)
                            } else {
                                log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
                            }
                        }(domainEvent)
                    case kafka.PartitionEOF:
                        fmt.Printf("%% Reached %v\n", e)
                    case kafka.Error:
                        // Errors should generally be considered as informational, the client will try to automatically recover
                        _, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                    }
                }
            }
        }()
    }    
    
    func (kc *KafkaConsumer) Close() {
        err := kc.consumer.Close()
        if err != nil {
            log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
        }
    }