Search code examples
multithreadinggogoroutine

Go run loop in parallel with timeout


I need to run the requests in parallel and not one after another but with timeout. now can I do it in go ?

This is the specific code which I need to run in parallel and the trick here is also to use timeout, i.e. wait to all the request according to the timeout and get the responses after all finished.

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

    }

This is the all code (working code) https://play.golang.org/p/cXnJJ6PW_CF

package main

import (
    `fmt`
    `net/http`
    `time`
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    name string
    res  http.Response
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:    url,
        name:   name,
        client: &http.Client{Timeout: timeout},
    }
}

func (p *Tap) Check() (*testerResponse, error) {
    response := &testerResponse{}
    req, err := http.NewRequest("GET", p.url, nil)
    if err != nil {
        return nil, err
    }
    res, e := p.client.Do(req)
    response.name = p.name
    response.res = *res
    if err != nil {
        return response, e
    }
    return response, e
}

func (p *Tap) Name() string {
    return p.name
}

func main() {

    var checkers []HT

    testers := []Tap{
        {
            name:    "first call",
            url:     "http://stackoverflow.com",
            timeout: time.Second * 20,
        },
        {
            name:    "second call",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
    }

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

        checkers = append(checkers, checker)

    }
}

Solution

  • A popular concurrency pattern in Go is using worker pools.

    A basic worker pool uses two channels; one to put jobs on, and another to read results to. In this case, our jobs channel will be of type Tap and our results channel will be of type testerResponse.

    Workers

    take a job from the jobs channel and puts the result on the results channel.

    // worker defines our worker func. as long as there is a job in the
    // "queue" we continue to pick up  the "next" job
    func worker(jobs <-chan Tap, results chan<- testerResponse) {
        for n := range jobs {
            results <- n.Check()
        }
    }
    

    Jobs

    to add jobs, we need to iterate over our testers and put them on our jobs channel.

    // makeJobs fills up our jobs channel
    func makeJobs(jobs chan<- Tap, taps []Tap) {
        for _, t := range taps {
            jobs <- t
        }
    }
    

    Results

    In order to read results, we need to iterate over them.

    // getResults takes a job from our worker pool and gets the result
    func getResults(tr <-chan testerResponse, taps []Tap) {
        for range taps {
            r := <- tr
            status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
            if r.err != nil {
                status = fmt.Sprintf(r.err.Error())
            }
            fmt.Println(status)
        }
    }
    

    Finally, our main function.

    func main() {
        // Make buffered channels
        buffer := len(testers)
        jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
        resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
    
        // Create worker pool
        // Max workers default is 5
        // maxWorkers := 5
        // for i := 0; i < maxWorkers; i++ {
        //  go worker(jobsPipe, resultsPipe)
        // }
    
        // the loop above is the same as doing:
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        // ^^ this creates 5 workers..
    
        makeJobs(jobsPipe, testers)
        getResults(resultsPipe, testers)
    }
    

    Putting it all together

    I changed the timeout to one millisecond for the 'second call' to show how the timeout works.

    package main
    
    import (
        "fmt"
        "net/http"
        "time"
    )
    
    type HT interface {
        Name() string
        Check() (*testerResponse, error)
    }
    
    type testerResponse struct {
        err  error
        name string
        res  http.Response
        url  string
    }
    
    type Tap struct {
        url     string
        name    string
        timeout time.Duration
        client  *http.Client
    }
    
    func NewTap(name, url string, timeout time.Duration) *Tap {
        return &Tap{
            url:    url,
            name:   name,
            client: &http.Client{Timeout: timeout},
        }
    }
    
    func (p *Tap) Check() testerResponse {
        fmt.Printf("Fetching %s %s \n", p.name, p.url)
        // theres really no need for NewTap
        nt := NewTap(p.name, p.url, p.timeout)
        res, err := nt.client.Get(p.url)
        if err != nil {
            return testerResponse{err: err}
        }
    
        // need to close body
        res.Body.Close()
        return testerResponse{name: p.name, res: *res, url: p.url}
    }
    
    func (p *Tap) Name() string {
        return p.name
    }
    
    // makeJobs fills up our jobs channel
    func makeJobs(jobs chan<- Tap, taps []Tap) {
        for _, t := range taps {
            jobs <- t
        }
    }
    
    // getResults takes a job from our jobs channel, gets the result, and
    // places it on the results channel
    func getResults(tr <-chan testerResponse, taps []Tap) {
        for range taps {
            r := <-tr
            status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
            if r.err != nil {
                status = fmt.Sprintf(r.err.Error())
            }
            fmt.Printf(status)
        }
    }
    
    // worker defines our worker func. as long as there is a job in the
    // "queue" we continue to pick up  the "next" job
    func worker(jobs <-chan Tap, results chan<- testerResponse) {
        for n := range jobs {
            results <- n.Check()
        }
    }
    
    var (
        testers = []Tap{
            {
                name:    "1",
                url:     "http://google.com",
                timeout: time.Second * 20,
            },
            {
                name:    "2",
                url:     "http://www.yahoo.com",
                timeout: time.Second * 10,
            },
            {
                name:    "3",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "4",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
            {
                name:    "5",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "6",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
            {
                name:    "7",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "8",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
            {
                name:    "9",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "10",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
            {
                name:    "11",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "12",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
            {
                name:    "13",
                url:     "http://stackoverflow.com",
                timeout: time.Second * 20,
            },
            {
                name:    "14",
                url:     "http://www.example.com",
                timeout: time.Second * 10,
            },
        }
    )
    
    func main() {
        // Make buffered channels
        buffer := len(testers)
        jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
        resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
    
        // Create worker pool
        // Max workers default is 5
        // maxWorkers := 5
        // for i := 0; i < maxWorkers; i++ {
        //  go worker(jobsPipe, resultsPipe)
        // }
    
        // the loop above is the same as doing:
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        go worker(jobsPipe, resultsPipe)
        // ^^ this creates 5 workers..
    
        makeJobs(jobsPipe, testers)
        getResults(resultsPipe, testers)
    }
    
    

    Which outputs:

    // Fetching http://stackoverflow.com 
    // Fetching http://www.example.com 
    // Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
    // 'first call' to 'http://stackoverflow.com' was fetched with status '200'