Search code examples
asynchronousgoredisbeanstalkd

Go queue processing with retry on failure


We have a bunch of files to be uploaded to remote blob store after processing.

Currently, the frontend (PHP) creates a redis list of such files and gives it a unique ID, called JobID. It then passes the unique ID to a beanstalk tube, which is received by a Go process. It uses a library called Go workers to process each job ID in the fashion of what net/http does. It receives the job ID, retrieves the redis list and starts processing files.

However, currently only one file is processed at a time. Since the operation here is I/O bound, not CPU bound, intuition suggests that it would be benefitial to use a goroutine per file.

However, we want to retry uploading on failure, as well as track the number of items processed per job. We cannot start a unbound number of goroutines because a single Job can contain about ~10k files to process and 100s of such Jobs can be sent per second during peak times. What would be the correct approach for this?

NB: We can change the technology stack a bit if needed (such as swapping out beanstalkd for something)


Solution

  • You can limit the number of goroutines by using a buffered chan with a size of the maximum number of goroutines you want. You can then block on this chan if it reaches maximum capacity. As your goroutines finish, they will free up slots to allow new goroutines to run.

    Example:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    var (
        concurrent    = 5
        semaphoreChan = make(chan struct{}, concurrent)
    )
    
    func doWork(wg *sync.WaitGroup, item int) {
        // block while full
        semaphoreChan <- struct{}{}
    
        go func() {
            defer func() {
                // read to release a slot
                <-semaphoreChan
                wg.Done()
            }()
            // This is where your work actually gets done
            fmt.Println(item)
        }()
    }
    
    func main() {
        // we need this for the example so that we can block until all goroutines finish
        var wg sync.WaitGroup
        wg.Add(10)
    
        // start the work
        for i := 0; i < 10; i++ {
            doWork(&wg, i)
        }
    
        // block until all work is done
        wg.Wait()
    }
    

    Go Playground link: https://play.golang.org/p/jDMYuCe7HV

    Inspired by this Golang UK Conference talk: https://youtu.be/yeetIgNeIkc?t=1413