Search code examples
multithreadinggosynchronizationshutdown

how to safely close a chan chan T in Go?


I'm implementing a simple worker-pool algorithm where 1 Sender (the dispatcher) sends jobs to M (Worker) go routines. For that it uses a channel of channels to allocate to the first idle worker an available job:

// builds the pool
func NewWorkerPool(maxWorkers int) WorkerPool {
    pool := make(chan chan Job, maxWorkers)
    workers := make([]Worker, 0)
    return WorkerPool{
        WorkerPool: pool,
        Workers: workers,
        maxWorkers: maxWorkers,
        waitGroup: sync.WaitGroup{}}
}

// Starts the WorkerPool
func (p *WorkerPool) Run(queue chan Job) {
    w := p.waitGroup

    // starting n number of workers
    for i := 0; i < p.maxWorkers; i++ {
        worker := NewWorker(p.WorkerPool)
        p.Workers = append(p.Workers, worker)
        w.Add(1)
        worker.Start(&w)
    }

    go p.dispatch(queue)
}

// dispatches a job to be handled by an idle Worker of the pool
func (p *WorkerPool) dispatch(jobQueue chan Job) {
    for {
        select {
        case job := <-jobQueue:
            // a model request has been received
            go func(job Job) {
                // try to obtain a worker model channel that is available.
                // this will block until a worker is idle
                jobChannel := <-p.WorkerPool

                // dispatch the model to the worker model channel
                jobChannel <- job
            }(job)
        }
    }
}


// checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed
func (p *WorkerPool) IsOpen() bool {
    _, ok := <-p.WorkerPool
    return ok
}

The worker Start and Stop methods

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(wg *sync.WaitGroup) {
    go func() {
        defer wg.Done()
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                result := job.Run()
                job.ReturnChannel <- result

                // once result is returned close the job output channel
                close(job.ReturnChannel)

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

Now I'm trying to close the Pool by using the following method, I use a sync.WaitGroup in order to wait for all the workers to shutdown:

// stops the Pool
func (p *WorkerPool) Stop() bool {
    // stops all workers
    for _, worker := range p.Workers {
        worker.Stop()
    }
    p.waitGroup.Wait() //Wait for the goroutines to shutdown

    close(p.WorkerPool)

    more := p.IsOpen()

    fmt.Printf(" more? %t", more)

    return ok
}

// prints more? TRUE

Even though I wait for the workers to quit and later on invoke close(p.WorkerPool) I still have the channel open, what is missing in this case, how to close the channels accordingly ?


Solution

  • Closing a channel indicates that no more values will be sent to it. This can be useful to communicate completion to the channel’s receivers.

    The data in the channel will still be there where you may have to close the channel and then remove all channels inside it like following

    // Stop stops the Pool and free all the channels
    func (p *WorkerPool) Stop() bool {
        // stops all workers
        for _, worker := range p.Workers {
            worker.Stop()
        }
        p.waitGroup.Wait() //Wait for the goroutines to shutdown
        close(p.WorkerPool)
        for channel := range p.WorkerPool {
            fmt.Println("Freeing channel") //remove all the channels
        }
        more := p.IsOpen()
        fmt.Printf(" more? %t", more)
    
        return ok
    }
    

    BTW, one can not use _, ok <- to check if a channel is closed. I would suggest a different name for the function