Search code examples
goapache-kafkacronconsumerconfluent-kafka-go

Kafka consumer not pausing when needed


I'm using the v2 of the lib https://github.com/confluentinc/confluent-kafka-go to consume from my topic from multiple partitions.

when I start my consumer, looks fine, but for business reasons, I need to stop the consumer for a while during a specific period of time, but seems that my Pause method is not working.

I suspect it's because of the PartitionAny, but not sure about that.

This is my adapter:

const (
    saslMechanismSha512 = "SCRAM-SHA-512"

    sessionTimeoutInMs = 180000 // => 3 minutes
)

type KafkaAdapter struct {
    Consumer *kafka.Consumer
}

func NewKafkaAdapter(ctx context.Context, consumer *kafka.Consumer, topic string) (*KafkaAdapter, error) {
    err := consumer.Subscribe(topic, nil)
    if err != nil {
        return nil, fmt.Errorf("error subscribing to topic %s: %v", topic, err)
    }

    return &KafkaAdapter{
        Consumer: consumer,
    }, nil
}

func (k *KafkaAdapter) Consume(ctx context.Context) (*port.Message, error) {
    select {
    case <-ctx.Done():
        return nil, context.Canceled
    default:
        message, err := k.Consumer.ReadMessage(-1) // -1 keeps undefined timeout while seeking for new messages
        if err != nil {
            return nil, err
        }

        headers := getMessageHeaders(message.Headers)

        streamName := getStreamName(headers)

        return &port.Message{
            Value:     message.Value,
            Key:       message.Key,
            Headers:   headers,
            Stream:    streamName,
            Timestamp: message.Timestamp,
            Offset:    int64(message.TopicPartition.Offset),
        }, nil
    }
}

func (k *KafkaAdapter) CommitMessage(ctx context.Context) error {
    _, err := k.Consumer.Commit()

    return err
}

func (k *KafkaAdapter) Unsubscribe(ctx context.Context) {
    k.Consumer.Unsubscribe()
}

func SetupKafkaConsumer(ctx context.Context, topic item.Topic) (*kafka.Consumer, error) {
    consumerConfig := &kafka.ConfigMap{
        "bootstrap.servers":  strings.Join(topic.Endpoints, ","),
        "group.id":           topic.Name,
        "session.timeout.ms": sessionTimeoutInMs,
        "enable.auto.commit": false,
    }

    if topic.User != "" && topic.Password != "" {
        consumerConfig.SetKey("sasl.username", topic.User)
        consumerConfig.SetKey("sasl.password", topic.Password)
        consumerConfig.SetKey("security.protocol", "SASL_SSL")
        consumerConfig.SetKey("sasl.mechanism", saslMechanismSha512)
        consumerConfig.SetKey("sasl.mechanisms", saslMechanismSha512)
    }

    consumer, err := kafka.NewConsumer(consumerConfig)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
        return nil, err
    }

    return consumer, nil
}

func getMessageHeaders(messageHeaders []kafka.Header) []port.MessageHeader {
    var headers []port.MessageHeader
    for _, kafkaHeader := range messageHeaders {
        header := port.MessageHeader{
            Key:   string(kafkaHeader.Key),
            Value: kafkaHeader.Value,
        }
        headers = append(headers, header)
    }

    return headers
}

func getStreamName(headers []port.MessageHeader) string {
    var streamName string
    for _, header := range headers {
        if header.Key == "sn" {
            streamName = string(header.Value)
            break
        }
    }

    return streamName
}

And this is my main.go file:


const (
    saslMechanismSha512 = "SCRAM-SHA-512"
)

var (
    topicExample = "topic-example"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
    if err != nil {
        log.Fatalf("Error getting configurations: %v", err)
    }

    topicConfiguration := item.ReadTopic(appConfig, topicExample, false) // false = will get reader configuration

    consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
    }
    defer consumer.Close()

    topicPartition := kafka.TopicPartition{
        Topic:     &topicExample,
        Offset:    kafka.OffsetStored,
        Partition: kafka.PartitionAny,
    }

    err = consumer.Assign([]kafka.TopicPartition{topicPartition})
    if err != nil {
        panic(fmt.Sprintf("error assigning topic/partitions: %v", err))
    }

    kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, topicExample)
    if err != nil {
        log.Fatalf("error creating Kafka adapter: %v", err)
    }

    repo, err := bootstrap.NewRepositories(appConfig)
    if err != nil {
        log.Fatalf("error creating a repository: %v", err)


    }

    dataManager := models.NewGormDataManager(repo.Db)

    messageService := service.NewMessageService(kafkaReader, dataManager)

    signalChannel := make(chan os.Signal, 1)
    signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChannel
        cancel()
    }()

    c := cron.New()
    c.AddFunc("22 18 * * *", func() {
        // I need to pause during a specific time
        consumer.Pause([]kafka.TopicPartition{topicPartition})
    })

    c.AddFunc("28 18 * * *", func() {
        // And then, resume it when needed
        consumer.Resume([]kafka.TopicPartition{topicPartition})
    })
    c.Start()

    messageService.StartConsuming(ctx, topicExample)

    <-ctx.Done()
}

I'm using consumer.Pause([]kafka.TopicPartition{topicPartition}), but with no effect.

And I'm algo not sure if my consumer is connecting to all partitions or to only one.


Solution

  • I figured out a way to make it work.

    I changed the Assign method by the method Subscribe, this way I dont have to get worried about kafka partition balancing.

    And also, instead of using method .Pause and .Resume, the Subscribe and Unsubscribe methods worked fine for me.

    Something like this:

    func main() {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
    
        appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
        if err != nil {
            log.Fatalf("Error getting configurations: %v", err)
        }
    
        topicConfiguration := item.ReadTopic(appConfig, someTopic, false) // false = will get reader configuration
    
        consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
        if err != nil {
            log.Fatalf("error creating Kafka consumer: %v", err)
        }
        defer consumer.Close()
    
        kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, someTopic)
        if err != nil {
            log.Fatalf("error creating Kafka adapter: %v", err)
        }
    
        repo, err := bootstrap.NewRepositories(appConfig)
        if err != nil {
            log.Fatalf("error creating a repository for my-topic flow: %v", err)
        }
    
        dataManager := models.NewGormDataManager(repo.Db)
    
        messageService := service.NewMessageService(kafkaReader, dataManager)
    
        signalChannel := make(chan os.Signal, 1)
        signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
    
        go func() {
            <-signalChannel
            cancel()
        }()
    
        c := cron.New()
        c.AddFunc("59 23 * * *", func() {
            messageService.Subscribe(ctx, someTopic)
        })
    
        c.AddFunc("30 2 * * *", func() {
            messageService.Unsubscribe(ctx)
        })
        c.Start()
    
        messageService.StartConsuming(ctx, someTopic)
    
        <-ctx.Done()
    }