Search code examples
gogoroutine

Always have x number of goroutines running at any time


I see lots of tutorials and examples on how to make Go wait for x number of goroutines to finish, but what I'm trying to do is have ensure there are always x number running, so a new goroutine is launched as soon as one ends.

Specifically I have a few hundred thousand 'things to do' which is processing some stuff that is coming out of MySQL. So it works like this:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
    err := rows.Scan(&id)
    checkErr(err)
    go processTheThing(id)
    }
checkErr(err)
rows.Close()

Currently that will launch several hundred thousand threads of processTheThing(). What I need is that a maximum of x number (we'll call it 20) goroutines are launched. So it starts by launching 20 for the first 20 rows, and from then on it will launch a new goroutine for the next id the moment that one of the current goroutines has finished. So at any point in time there are always 20 running.

I'm sure this is quite simple/standard, but I can't seem to find a good explanation on any of the tutorials or examples or how this is done.


Solution

  • Thanks to everyone for helping me out with this. However, I don't feel that anyone really provided something that both worked and was simple/understandable, although you did all help me understand the technique.

    What I have done in the end is I think much more understandable and practical as an answer to my specific question, so I will post it here in case anyone else has the same question.

    Somehow this ended up looking a lot like what OneOfOne posted, which is great because now I understand that. But OneOfOne's code I found very difficult to understand at first because of the passing functions to functions made it quite confusing to understand what bit was for what. I think this way makes a lot more sense:

    package main
    
    import (
    "fmt"
    "sync"
    )
    
    const xthreads = 5 // Total number of threads to use, excluding the main() thread
    
    func doSomething(a int) {
        fmt.Println("My job is",a)
        return
    }
    
    func main() {
        var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
        var wg sync.WaitGroup
    
        // This starts xthreads number of goroutines that wait for something to do
        wg.Add(xthreads)
        for i:=0; i<xthreads; i++ {
            go func() {
                for {
                    a, ok := <-ch
                    if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
                        wg.Done()
                        return
                    }
                    doSomething(a) // do the thing
                }
            }()
        }
    
        // Now the jobs can be added to the channel, which is used as a queue
        for i:=0; i<50; i++ {
            ch <- i // add i to the queue
        }
    
        close(ch) // This tells the goroutines there's nothing else to do
        wg.Wait() // Wait for the threads to finish
    }