Im unable to terminate my WaitGroup in go and consequently can't exit the range loop. Can anybody tell me why. Or a better way of limiting the number of go routines while still being able to exit on chan close!
Most examples i have seen relate to a statically typed chan length, but this channel is dynamically resized as a result of other processes.
The print statement ("DONE!") in the example are printed showing that the testValProducer prints the right amount of times but the code never reaches ("--EXIT--") which means wg.Wait is still blocking somehow.
type TestValContainer chan string
func StartFunc(){
testValContainer := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg := sync.WaitGroup{}
// limit the number of worker goroutines
for i:=0; i < 3; i++ {
wg.Add(1)
go func(){
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer{
testValProducer(str, &wg)
}
fmt.Println(v, "--EXIT --") // never called
}()
}
wg.Wait()
close(testValContainer)
}
func get(url string){
http.Get(url)
ch <- getUnvisited()
}
func testValProducer(testStr string, wg *sync.WaitGroup){
doSomething(testStr)
fmt.Println("done !") // called
wg.Done() // NO EFFECT??
}
I might do something like this, it keeps everything easy to follow. I define a structure which implements a semaphore to control the number of active Go routines spinning up... and allows me to read from the channel as they come in.
package main
import (
"fmt"
"sync"
)
type TestValContainer struct {
wg sync.WaitGroup
sema chan struct{}
data chan int
}
func doSomething(number int) {
fmt.Println(number)
}
func main() {
//semaphore limit 10 routines at time
tvc := TestValContainer{
sema: make(chan struct{}, 10),
data: make(chan int),
}
for i := 0; i <= 100; i++ {
tvc.wg.Add(1)
go func(i int) {
tvc.sema <- struct{}{}
defer func() {
<-tvc.sema
tvc.wg.Done()
}()
tvc.data <- i
}(i)
}
// wait in the background so that waiting and closing the channel dont
// block the for loop below
go func() {
tvc.wg.Wait()
close(tvc.data)
}()
// get channel results
for res := range tvc.data {
doSomething(res)
}
}