Search code examples
gokafka-producer-apigoroutinelibrdkafka

How does this goroutine continuously run (how is it working)?


My basic understanding of a goroutine is that its a simplified way to create a thread.

Looking at the confluent-kafka-go library, the following code is given as an example:

    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "myTopic"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

How does this work though? Would it not just run once and stop working once it looped through all of p.Events() ? How does go know to not abort the goroutine but keep on polling p.Events() - even though it will be empty for most of the times?


Solution

  • According to the documentation for Producer.Events(), it returns a channel.

    Ranging over a channel only terminates when the channel is closed. See the tour of Go for more details.