Suppose there are 10 senders and one receiver for a channel. A sender-side function takes some time to return value. The receiver only wants one value (the first value received) from the channel, and the other 9 values are not used. The receiver doesn't need to wait for the rest 9 values. That's why I didn't use sync.WaitGroup
.
I used a buffered channel, so there will be 9 data in a buffered channel when the receiver takes only the first one. My questions are:
Is it OK to leave a buffered channel with data open when there is no receiver? The following example code is a simplied one but if the program is a daemon, will it be garbage-collected eventually?
Are there better ways to handle such situation? I tried to use the cancellation channel but failed. And I'm not sure context
fits for this situation.
Example code:
package main
import (
"errors"
"fmt"
"math/rand"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
i, err := getRandomInt()
if err != nil {
fmt.Println(err)
} else {
fmt.Println(i)
}
fmt.Println("Waiting goroutines to be finished...")
time.Sleep(2 * time.Second)
}
func getRandomInt() (int, error) {
ch := make(chan int, 10)
// 10 senders
for i := 0; i < 10; i++ {
go func(i int) {
defer fmt.Printf("Goroutine #%d finished\n", i)
fmt.Printf("Goroutine #%d started\n", i)
data := heavyJob()
ch <- data
fmt.Printf("Goroutine #%d sent data %d to ch\n", i, data)
return
}(i)
}
// 1 receiver
timeout := time.After(2000 * time.Millisecond)
for {
select {
case value := <-ch:
// uses only the value received first, the rest are discarded
return value, nil
case <-timeout:
return -1, errors.New("Timeout")
}
}
}
// takes 1900~2900ms to finish
func heavyJob() int {
r := rand.Intn(1000)
time.Sleep(time.Duration(r+1900) * time.Millisecond)
return r
}
To answer the main questions:
Essentially you are creating an implicit coupling between the number of workers and the size of the buffered channel. Change one of those two numbers, and something will deadlock / break! (as a side note, buffered channels are usually for the case when the consumer and producers work at the same rate, but one does not have a steady output. It's spiky, the buffer smooths out the peaks and troughs.)
With this in mind, I suggest it would be better to be explicit about managing the fact that you do not want all the values.
Here is an updated version of the getRandomInt() function. Note the setup of context cancellations using defer at the top, and the use of the select statement on send.
func getRandomInt() (int, error) {
ctx := context.Background() // creates a fresh, empty context
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancels the context when getRandomInt() returns
ch := make(chan int)
// 10 senders
for i := 0; i < 10; i++ {
go func(i int) {
defer fmt.Printf("Goroutine #%d finished\n", i)
fmt.Printf("Goroutine #%d started\n", i)
data := heavyJob()
// this select statement wil block until either this goroutine
// is the first to send, or the context is cancelled. In which case
// another routine has already sent and it can discard it's values.
select {
case ch <- data:
fmt.Printf("Goroutine #%d sent data %d to ch\n", i, data)
case <-ctx.Done():
fmt.Printf("Goroutine #%d did not send, context is cancelled, would have sent data %d to ch\n", i, data)
}
}(i)
}
// 1 receiver
timeout := time.After(2000 * time.Millisecond)
select {
case value := <-ch:
// uses only the value received first, the rest are discarded
return value, nil
case <-timeout:
return -1, errors.New("Timeout")
}
}
Setting up the context with a cancellation means that the context becomes "Done" once the cancel()
function is called. This is a way to tell all the sender goroutines not to bother waiting to send.
On sending, the select statement blocks until either the context has been cancelled by the cancel()
function; or the receiver method reads the first value.
I've also removed the buffering from the channel, as there's no need for it anymore.