I have a go routine which is basically acting as a KafkaConsumer
, it reads messages from a topic and then spawns another go routine
for each message it receives. Now this Consumer go routine
is supposed to be shutdown when the application which is the main go routine
shuts down. But I am facing difficulties in properly shutting this down.
Below is the Kafka Consumer
definition
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"sync"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
eventChannels []string
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
wg *sync.WaitGroup
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
var wg sync.WaitGroup
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{eventChannels: channels, done: done, wg: &wg, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume() {
go func() {
run := true
for run == true {
select {
case sig := <-kc.done:
log.Println(fmt.Sprintf("Caught signal %v: terminating \n", sig))
run = false
return
default:
}
e := <-kc.consumer.Events()
switch event := e.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Assign(event.Partitions)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while assigning partitions.", err))
}
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", event)
err := kc.consumer.Unassign()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while unassigning partitions.", err))
}
case *kafka.Message:
domainEvent := kc.getEvent(event.Value)
kc.wg.Add(1)
go func(event *eventService.Event) {
defer kc.wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}()
}
func (kc *KafkaConsumer) Close() {
log.Println("Waiting")
kc.wg.Wait()
kc.done <- true
log.Println("Done waiting")
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}
And below is the main thread code
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog",
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume()
kafkaConsumer.Close()
}
The problem here is that the application is not ending at all sometimes and its not executing the consume
function in some runs, what am i missing here ?
Alright, here goes the solution, 1. Since the consumer go routine should live as long as the main go routine is alive and the main go routine is also an endless go routine, closing the consumer go routine while the go routine is running, isn't the correct approach.
So the following solution works
package main
import (
"event-service/pkg/pb"
"event-service/pkg/svc"
"fmt"
"log"
"sync"
)
func main() {
eventConsumerMapping := map[string]svc.EventConsumer{"doctor-created": func(event eventService.Event) {
log.Println(fmt.Sprintf("Got event %v from kafka", event))
}}
consumerConfig := map[string]interface{}{
"bootstrap.servers": "localhost:9092",
"group.id": "catalog-2",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"enable.partition.eof": true,
"auto.offset.reset": "earliest",
}
var wg sync.WaitGroup
kafkaConsumer := svc.NewKafkaConsumer([]string{"doctor-created"}, consumerConfig, eventConsumerMapping)
kafkaConsumer.Consume(&wg)
wg.Wait()
kafkaConsumer.Close()
}
The service definition
package svc
import (
"event-service/pkg/pb"
"fmt"
"github.com/gogo/protobuf/proto"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
type EventConsumer func(event eventService.Event)
type KafkaConsumer struct {
done chan bool
consumer *kafka.Consumer
consumerMapping map[string]EventConsumer
sigChan chan os.Signal
channels []string
}
func getKafkaConsumerConfigMap(config map[string]interface{}) *kafka.ConfigMap {
configMap := &kafka.ConfigMap{}
for key, value := range config {
err := configMap.SetKey(key, value)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while setting %v: %v", err, key, value))
}
}
return configMap
}
func NewKafkaConsumer(channels []string, config map[string]interface{}, consumerMapping map[string]EventConsumer) *KafkaConsumer {
sigChan := make(chan os.Signal, 1)
consumer, err := kafka.NewConsumer(getKafkaConsumerConfigMap(config))
done := make(chan bool, 1)
if err != nil {
log.Fatalf("An error %v occurred while starting kafka consumer.", err)
}
err = consumer.SubscribeTopics(channels, nil)
if err != nil {
log.Fatalf("An error %v occurred while subscribing to kafka topics %v.", err, channels)
}
return &KafkaConsumer{channels: channels, sigChan: sigChan, done: done, consumer: consumer, consumerMapping: consumerMapping}
}
func (kc *KafkaConsumer) getEvent(eventData []byte) *eventService.Event {
event := eventService.Event{}
err := proto.Unmarshal(eventData, &event)
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while un marshalling data from kafka.", err))
}
return &event
}
func (kc *KafkaConsumer) Consume(wg *sync.WaitGroup) {
signal.Notify(kc.sigChan, syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go func() {
run := true
defer wg.Done()
for run == true {
select {
case sig := <-kc.sigChan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
case ev := <-kc.consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Assign(e.Partitions)
case kafka.RevokedPartitions:
_, _ = fmt.Fprintf(os.Stderr, "%% %v\n", e)
_ = kc.consumer.Unassign()
case *kafka.Message:
domainEvent := kc.getEvent(e.Value)
wg.Add(1)
go func(event *eventService.Event) {
defer wg.Done()
if eventConsumer := kc.consumerMapping[domainEvent.EntityType]; eventConsumer != nil {
eventConsumer(*domainEvent)
} else {
log.Println(fmt.Sprintf("Event consumer not found for %v event type", domainEvent.EntityType))
}
}(domainEvent)
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v\n", e)
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
_, _ = fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}
}()
}
func (kc *KafkaConsumer) Close() {
err := kc.consumer.Close()
if err != nil {
log.Println(fmt.Sprintf("An error %v occurred while closing kafka consumer.", err))
}
}