I'm new in GO, and I have a question about stopping goroutines with channel signal.
I have a long-running goroutines (more than 1000) and managerto manage it:
func myThreadFunc(stop chan bool) {
for {
select {
case <- stop:
log.Debug("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
// This can take long time up to 30 seconds - this is external HTTP API call
time.Sleep(5 * time.Second)
}
func manager() {
var cancelChannelSlice []chan bool
for i := 0; i < 1000; i++ {
cancelChannel := make(chan bool)
cancelChannelSlice = append(cancelChannelSlice, cancelChannel)
go myThreadFunc(cancelChannel)
}
var stopTest = func() {
for _, c := range cancelChannelSlice {
c <- true
}
}
timeout := time.After(time.Duration(300) * time.Second)
for {
select {
case <-timeout:
stopTest()
default:
time.Sleep(time.Second)
}
}
}
In this case each time I call c <- true
manager waits for callClientTask()
to finish and then go to next cancelChannel
I want all goroutines stop in 1 iteration of callClientTask()
(not more than 30 sec)
Ther only way I tried is to cast new goroutines like this:
var stopTest = func() {
for _, c := range cancelChannelSlice {
go func(c chan bool) {
c <- true
close(c)
}(c)
}
}
I this a right way?
As I understand from your question, "you want all goroutines stop in 1 iteration of callClientTask() (not more than 30 sec)" and workers to run concurrently without synchronization problem.
I reorganized code that run concurrently with wait groups.
Sample Code:
package main
import (
"log"
"sync"
"time"
)
func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-stop:
log.Println("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
stop := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go worker(stop, &wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
close(stop) // stop all workers, close channel
wg.Wait() // wait for all workers
}
Output:
2023/10/26 10:40:44 Stopping thread
2023/10/26 10:40:44 Stopping thread
....
2023/10/26 10:40:49 Stopping thread
2023/10/26 10:40:49 Stopping thread
EDIT:
You have to update the worker if u want to stop some of the workers. The following code includes workers with 'stop' and 'stopped' channels, and start/stop functions.
Sample Code:
package main
import (
"log"
"sync"
"time"
)
type Worker struct {
stop chan struct{}
stopped chan struct{}
}
func NewWorker() *Worker {
return &Worker{
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
}
func (w *Worker) Start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-w.stop:
log.Println("Stopping thread")
close(w.stopped)
return
default:
callClientTask()
}
}
}()
}
func (w *Worker) Stop() {
close(w.stop)
<-w.stopped
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
workers := make([]*Worker, 1000)
for i := 0; i < 1000; i++ {
workers[i] = NewWorker()
workers[i].Start(&wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
for i := 0; i < 100; i++ { // stop first 100 workers
workers[i].Stop()
}
for i := 100; i < 1000; i++ { // wait other workers to finish
workers[i].Stop()
}
wg.Wait()
}
Output:
2023/10/26 12:51:26 Stopping thread
2023/10/26 12:51:28 Stopping thread
2023/10/26 12:51:30 Stopping thread
....