Search code examples
gochannelgoroutine

Faster close of goroutines with channels


I'm new in GO, and I have a question about stopping goroutines with channel signal.

I have a long-running goroutines (more than 1000) and managerto manage it:

func myThreadFunc(stop chan bool) {
    for {
        select {
        case <- stop:
            log.Debug("Stopping thread")
            return
        default:
            callClientTask() 
        }
    }
}

func callClientTask() {
    // This can take long time up to 30 seconds - this is external HTTP API call
    time.Sleep(5 * time.Second)
}


func manager() {
    var cancelChannelSlice []chan bool
    for i := 0; i < 1000; i++ {
        cancelChannel := make(chan bool)
        cancelChannelSlice = append(cancelChannelSlice, cancelChannel)

        go myThreadFunc(cancelChannel)
    }

    var stopTest = func() {
        for _, c := range cancelChannelSlice {
            c <- true
        }
    }

    timeout := time.After(time.Duration(300) * time.Second)
    for {
        select {
        case <-timeout:
            stopTest()
        default:
            time.Sleep(time.Second)
        }
    }
}

In this case each time I call c <- true manager waits for callClientTask() to finish and then go to next cancelChannel I want all goroutines stop in 1 iteration of callClientTask() (not more than 30 sec)

Ther only way I tried is to cast new goroutines like this:

var stopTest = func() {
        for _, c := range cancelChannelSlice {
            go func(c chan bool) {
                c <- true
                close(c)
            }(c)
        }
    }

I this a right way?


Solution

  • As I understand from your question, "you want all goroutines stop in 1 iteration of callClientTask() (not more than 30 sec)" and workers to run concurrently without synchronization problem.

    I reorganized code that run concurrently with wait groups.

    Sample Code:

    package main
    
    import (
        "log"
        "sync"
        "time"
    )
    
    func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
    
        for {
            select {
            case <-stop:
                log.Println("Stopping thread")
                return
            default:
                callClientTask()
            }
        }
    }
    
    func callClientTask() {
        time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
    }
    
    func main() {
        var wg sync.WaitGroup
        stop := make(chan struct{})
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go worker(stop, &wg)
        }
    
        time.Sleep(5 * time.Second) // allow workers to run for a while
        close(stop)                 // stop all workers, close channel
        wg.Wait()                   // wait for all workers
    }
    

    Output:

    2023/10/26 10:40:44 Stopping thread
    2023/10/26 10:40:44 Stopping thread
    ....
    2023/10/26 10:40:49 Stopping thread
    2023/10/26 10:40:49 Stopping thread
    

    EDIT:

    You have to update the worker if u want to stop some of the workers. The following code includes workers with 'stop' and 'stopped' channels, and start/stop functions.

    Sample Code:

    package main
    
    import (
        "log"
        "sync"
        "time"
    )
    
    type Worker struct {
        stop    chan struct{}
        stopped chan struct{}
    }
    
    func NewWorker() *Worker {
        return &Worker{
            stop:    make(chan struct{}),
            stopped: make(chan struct{}),
        }
    }
    
    func (w *Worker) Start(wg *sync.WaitGroup) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-w.stop:
                    log.Println("Stopping thread")
                    close(w.stopped)
                    return
                default:
                    callClientTask()
                }
            }
        }()
    }
    
    func (w *Worker) Stop() {
        close(w.stop)
        <-w.stopped
    }
    
    func callClientTask() {
        time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
    }
    
    func main() {
        var wg sync.WaitGroup
        workers := make([]*Worker, 1000)
    
        for i := 0; i < 1000; i++ {
            workers[i] = NewWorker()
            workers[i].Start(&wg)
        }
    
        time.Sleep(5 * time.Second) // allow workers to run for a while 
        for i := 0; i < 100; i++ { // stop  first 100 workers
            workers[i].Stop()
        }  
        for i := 100; i < 1000; i++ { // wait other workers to finish
            workers[i].Stop()
        }
        wg.Wait()
    }
    

    Output:

    2023/10/26 12:51:26 Stopping thread
    2023/10/26 12:51:28 Stopping thread
    2023/10/26 12:51:30 Stopping thread
    ....