Search code examples
gogetgoroutine

Unexpected behaviour of time.Now() in goroutine


As a way of trying to familiarize myself with Go I am trying to build a (completely unreliable) random number generator. The idea is to time 100 GET requests to some url, do something with the results and produce a "random" number.
I was interested in seeing if the code would run faster when using goroutines in a workgroup to do the requests. The answer seems to be yes, but when printing out the results for the timing of individual requests, the timings for the goroutine calls exhibit an interesting result.
Sequential timings of the GET requests in microseconds:
[25007 30502 25594 40417 31505 18502 20503 19034 19473 18001 36507 25004 28005 19004 20502 20503 20503 20504 20002 19003 20511 18494 20003 21004 20003 20502 20504 19002 19004 21506 29501 30005 31005 21504 20054 22452 19503 19503 20003 19503 21004 18501 18003 20003 20003 19003 19503 20003 23504 18003 20003 19503 19502 19003 20003 20003 20040 21010 18959 20503 34251 27260 30504 25004 22004 20502 20003 19503 20502 20504 19503 22003 19003 19003 20003 20002 18003 19503 19003 18503 20504 18552 18953 18002 20003 19004 21002 18503 20503 19503 20504 20003 20003 21003 46050 19504 18503 19503 19503 19002]

Goroutine timings of the GET requests in microseconds:
[104518 134570 157528 187533 193535 193535 208036 211041 220039 220242 252044 252044 258045 258045 258045 258045 271047 282050 282050 282050 286050 287050 289051 296052 297552 300052 300678 305553 307053 308054 310556 311069 312055 312555 324056 329558 334559 339559 346061 353562 360563 369564 375065 377566 384067 393569 397069 402570 410072 416572 420573 425574 431076 437576 443078 446577 453579 458580 465081 474583 480584 488085 496122 505588 510589 515590 520591 526592 533593 538596 544595 549596 555097 563098 569600 575100 584101 589604 595604 604106 610606 620609 634111 640611 645613 653119 656616 663116 669117 674118 681119 696122 709123 723627 735629 747631 757632 769635 779137 785139]
The timings for the goroutine calls are incremental, while the regular sequential timings are expected. I suspected this might have something to do with time.now() being evaluated once for all gorotines, but shuffling that call around did not change the results.
This is what I have so far, and I know entropy is not a good measure of randomness but I included it anyway because of reasons :)
First the goroutines are run, next the sequential version is run. Lastly, timings and some other stuff gets printed.

package main

import (
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"

    "github.com/montanaflynn/stats"
)

func doGet(address string, channel chan int, wg *sync.WaitGroup) {
    // do get request in a workgroup
    // defer wg signal
    defer wg.Done()
    // start time
    startTime := time.Now()
    _, err := http.Get(address)
    if err != nil {
        log.Fatalln(err)
    }
    // get time since start
    delta := int(time.Since(startTime).Microseconds())
    channel <- delta
}

func doGetNoWg(address string) int {
    // do get request without a workgroup/channel
    start := time.Now()
    _, err := http.Get(address)
    if err != nil {
        log.Fatalln(err)
    }
    return int(time.Since(start).Microseconds())
}

func main() {
    var wg sync.WaitGroup
    // initialize arrays for the timings
    var timings_parallel [100]int
    var timings_sequential [100]int
    // get a small uniform set for comparison of entropy
    zeroes := []int{1, 1, 1}
    // get a random set for comparison of entropy
    var randnrs [100]int
    for i := 0; i < len(randnrs); i++ {
        randnrs[i] = rand.Intn(250)
    }
    // start
    start := time.Now()
    ch := make(chan int, 100)
    url := "https://www.nu.nl"
    wg.Add(100)
    for i, _ := range timings_parallel {
        // can this be done without dummy assignemnt or explicit counters?
        i = i
        go doGet(url, ch, &wg)
    }
    wg.Wait()
    close(ch)
    // feed the results from the channel into the result array
    count := 0
    for ret := range ch {
        timings_parallel[count] = ret
        count++
    }
    // get total running time for this part
    time_parallel := time.Since(start).Milliseconds()

    // start of the sequential part
    start = time.Now()
    for i, _ := range timings_sequential {
        timings_sequential[i] = doGetNoWg(url)
    }
    // end sequential part. Why was I using goroutines again? :P
    time_sequential := time.Since(start).Milliseconds()
    
    // calculate entropy
    entropy, _ := stats.Entropy(stats.LoadRawData(timings_parallel[:]))
    entropy_equal, _ := stats.Entropy(stats.LoadRawData(zeroes[:]))
    entropy_random, _ := stats.Entropy(stats.LoadRawData(randnrs[:]))

    // print out stuffs
    fmt.Print("Parallel: ")
    fmt.Printf("%v\n", timings_parallel)
    fmt.Print("Sequential: ")
    fmt.Printf("%v\n", timings_sequential)
    fmt.Printf("Entropy equal: %v\n", entropy_equal)
    fmt.Printf("Entropy random: %v\n", entropy_random)
    fmt.Printf("Entropy: %v\n", entropy)
    fmt.Printf("Time elapsed parallel: %v\n", time_parallel)
    fmt.Printf("Time elapsed sequential: %v", time_sequential)

}

Example output (sans the the timing arrays):

Entropy equal: 1.0986122886681096
Entropy random: 4.39737296171013
Entropy: 4.527705829831552
Time elapsed parallel: 786
Time elapsed sequential: 2160

So the goroutines part seems a lot faster while the individual timings seem much higher. Does anybody have an idea on how to get the timings right (or why they are expected as they are)?

===== Update
The last timing of the goroutines is pretty much always equal to or a millisecond below the total time measured in Time elapsed parallel

===== Update2 The problem seems to be that the first call to time.Now() always yields the same time, while the second time.Now() works fine. At least this explains the results:

GOstart: 2022-04-05 18:47:06.3117452 +0200 CEST m=+0.004000601
GOstop: 2022-04-05 18:47:06.4736105 +0200 CEST m=+0.165865901
GOstart: 2022-04-05 18:47:06.3117452 +0200 CEST m=+0.004000601
GOstop: 2022-04-05 18:47:06.4736105 +0200 CEST m=+0.165865901
...
GOstart: 2022-04-05 18:47:06.3117452 +0200 CEST m=+0.004000601
GOstop: 2022-04-05 18:47:06.6234215 +0200 CEST m=+0.315676901


Solution

  • The cause of this behaviour lies in the scheduler of Go(shorter version of this question at golang-nuts). The above goroutines all start execution at the same point in time (as the timings suggest, plus inspection of the memory location for startTime variable proves that the time object is not "recycled" ), but are descheduled once they hit http.Get(). The timings are incremental because the http.Get() creates a bottleneck that does not allow for concurrent execution of the amount of goroutines spawned. It seems some sort of FIFO queue is used here.
    Recommended watching and reading:
    Explaining the Golang I/O multiplexing netpoller model
    Queues, Fairness and the Go scheduler

    Playing around with waitgroup sizes I found some values that showed much more consistent timings (insted of incremental). So I was wondering what the impact of waitgroup size on total and individual timings is. I refactored above into a program that does a number of experiments per waitgroupsize in a given range, and persists total and individual timings per run to an sqlite database. The resulting dataset can be easily used in e.g. a Jupyter Notebook. With the current settings I have unfortunately only been able to get to about 40K requests before getting throttled. See my github for some datasets if you are interested but do not want to wait for data, as it takes quite long to finish. Fun result, steep decline in ratio concurrent/sequential for small wg sizes and you see at the end where the connection starts getting throttled. This run was aborted manually at that time.
    Concurrent running time / sequential running time vs wait group size: waitgroup vs. running time ratio

    Some plots of the individual timings for different waitgroup sizes.

    Individual timings for waitgroupsize 1 Individual timings for waitgroupsize 10 Individual timings for waitgroupsize 50 Individual timings for waitgroupsize 80 Individual timings for waitgroupsize 100

    package main
    
    import (
        "database/sql"
        "fmt"
        "log"
        "net/http"
        "os"
        "path/filepath"
        "runtime"
        "sync"
        "time"
    
        _ "github.com/mattn/go-sqlite3"
    )
    
    ///// global vars
    const REQUESTS int = 100           // Single run size, performed two times (concurrent and sequential)
    const URL string = "SET_YOUR_OWN" // Some file on a CDN somewhere; used for the GET requests
    const DBNAME string = "netRand.db" // Name of the db file. Saved next to the executable
    const WGMIN int = 1                // Start range for waitgroup size (inclusive)
    const WGMAX int = 101              // Stop range for waitgroup size (exclusive)
    const NREPEAT int = 10             // Number of times to repeat a run for a specific waitgroup size
    
    //// types
    type timingResult struct {
        // Container for collecting results before persisting to DB
        WaitgroupSize       int
        ConcurrentTimingsMs [REQUESTS]int64
        ConcurrentTotalMs   int64
        SequentialTimingsMs [REQUESTS]int64
        SequentialTotalMs   int64
    }
    
    //// main
    func main() {
        db := setupDb()
        defer db.Close()
        for i := WGMIN; i < WGMAX; i++ {
            // waitgroup size range
            for j := 0; j < NREPEAT; j++ {
                // repeat for more data points
                timings := requestTimes(i)
                persistTimings(timings, db)
                fmt.Printf("\n======== %v of %v ============\n", j+1, NREPEAT)
                fmt.Printf("current waitgroup size: %v\n", i)
                fmt.Printf("max waitgroup size: %v\n", WGMAX-1)
            }
        }
    
    }
    
    func requestTimes(waitgroupSize int) timingResult {
        // do NTIMES requests in go routines with waitgroupSize
        // do NTIMES requests sequentially
    
        timings_concurrent, total_concurrent := concurrentRequests(waitgroupSize)
        timings_sequential, total_sequential := sequentialRequests()
    
        return timingResult{
            WaitgroupSize:       waitgroupSize,
            ConcurrentTimingsMs: timings_concurrent,
            ConcurrentTotalMs:   total_concurrent,
            SequentialTimingsMs: timings_sequential,
            SequentialTotalMs:   total_sequential,
        }
    
    }
    func persistTimings(timings timingResult, db *sql.DB) {
        persistRun(timings, db)
        currentRunId := getCurrentRunId(db)
        persistConcurrentTimings(currentRunId, timings, db)
        persistSequentialTimings(currentRunId, timings, db)
    }
    func concurrentRequests(waitgroupSize int) ([REQUESTS]int64, int64) {
        start := time.Now()
    
        var wg sync.WaitGroup
        var timings [REQUESTS]int64
        ch := make(chan int64, REQUESTS)
    
        for i := range timings {
            wg.Add(1)
            go func() {
                defer wg.Done()
                doGetChannel(URL, ch)
            }()
            // waitgroupsize is controlled using modulo
            // making sure experiment size is always NTIMES
            // independent of waitgroupsize
            if i%waitgroupSize == 0 {
                wg.Wait()
            }
        }
        wg.Wait()
        close(ch)
    
        count := 0
        for ret := range ch {
            timings[count] = ret
            count++
        }
    
        return timings, time.Since(start).Milliseconds()
    }
    func doGetChannel(address string, channel chan int64) {
        // time get request and send to channel
        startSub := time.Now().UnixMilli()
        _, err := http.Get(address)
        if err != nil {
            log.Fatalln(err)
        }
        stopSub := time.Now().UnixMilli()
        delta := stopSub - startSub
        channel <- delta
    }
    func sequentialRequests() ([REQUESTS]int64, int64) {
        startGo := time.Now()
        var timings_sequential [REQUESTS]int64
        for i := range timings_sequential {
            timings_sequential[i] = doGetReturn(URL)
        }
        return timings_sequential, time.Since(startGo).Milliseconds()
    }
    func doGetReturn(address string) int64 {
        // time get request without a waitgroup/channel
        start := time.Now()
        _, err := http.Get(address)
        if err != nil {
            log.Fatalln(err)
        }
        duration := time.Since(start).Milliseconds()
        return duration
    }
    
    //// DB
    func setupDb() *sql.DB {
        //      __________________________runs____________________
        //     |                                                  |
        // concurrent_timings(fk: run_id)         sequential_timings(fk: run_id)
        //
        const createRuns string = `
        CREATE TABLE IF NOT EXISTS runs (
        run_id INTEGER NOT NULL PRIMARY KEY,
        time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
        waitgroup_size INTEGER,
        concurrent_total_ms INTEGER,
        sequential_total_ms INTEGER,
        concurrent_sequential_ratio REAL
        );`
    
        const createSequentialTimings string = `
        CREATE TABLE IF NOT EXISTS sequential_timings (
        run INTEGER,
        call_number INTEGER,
        timing_ms INTEGER,
        FOREIGN KEY(run) REFERENCES runs(run_id)
        );`
    
        const createConcurrentTimings string = `
        CREATE TABLE IF NOT EXISTS concurrent_timings (
        run INTEGER,
        channel_position INTEGER,
        timing_ms INTEGER,
        FOREIGN KEY(run) REFERENCES runs(run_id)
        );`
        // retrieve platform appropriate connection string
        dbString := getConnectionString(DBNAME)
        db, err := sql.Open("sqlite3", dbString)
        if err != nil {
            log.Fatalln(err)
        }
        if _, err := db.Exec(createRuns); err != nil {
            log.Fatalln(err)
        }
        if _, err := db.Exec(createSequentialTimings); err != nil {
            log.Fatalln(err)
        }
        if _, err := db.Exec(createConcurrentTimings); err != nil {
            log.Fatalln(err)
        }
        return db
    }
    func getConnectionString(dbName string) string {
        // Generate platform appropriate connection string
        // the db is placed in the same directory as the current executable
    
        // retrieve the path to the currently executed executable
        ex, err := os.Executable()
        if err != nil {
            panic(err)
        }
        // retrieve path to containing dir
        dbDir := filepath.Dir(ex)
    
        // Append platform appropriate separator and dbName
        if runtime.GOOS == "windows" {
    
            dbDir = dbDir + "\\" + dbName
    
        } else {
            dbDir = dbDir + "/" + dbName
        }
        return dbDir
    }
    func persistRun(timings timingResult, db *sql.DB) {
        tx, err := db.Begin()
        if err != nil {
            log.Fatalln(err)
        }
    
        insertRun, err := db.Prepare(`INSERT INTO runs(
            waitgroup_size, 
            sequential_total_ms, 
            concurrent_total_ms, 
            concurrent_sequential_ratio) 
            VALUES(?, ?, ?, ?)`)
    
        if err != nil {
            log.Fatalln(err)
        }
        defer tx.Stmt(insertRun).Close()
        _, err = tx.Stmt(insertRun).Exec(
            timings.WaitgroupSize,
            timings.SequentialTotalMs,
            timings.ConcurrentTotalMs,
            float32(timings.ConcurrentTotalMs)/float32(timings.SequentialTotalMs),
        )
        if err != nil {
            log.Fatalln(err)
        }
        err = tx.Commit()
    
        if err != nil {
            log.Fatalln(err)
        }
    }
    
    func getCurrentRunId(db *sql.DB) int {
        rows, err := db.Query("SELECT MAX(run_id) FROM runs")
        if err != nil {
            log.Fatal(err)
        }
        var run_id int
        for rows.Next() {
            err = rows.Scan(&run_id)
            if err != nil {
                log.Fatalln(err)
            }
        }
        rows.Close()
        return run_id
    }
    func persistConcurrentTimings(runId int, timings timingResult, db *sql.DB) {
        tx, err := db.Begin()
        if err != nil {
            log.Fatalln(err)
        }
    
        insertTiming, err := db.Prepare(`INSERT INTO concurrent_timings(
            run, 
            channel_position, 
            timing_ms) 
            VALUES(?, ?, ?)`)
    
        if err != nil {
            log.Fatalln(err)
        }
        for i, timing := range timings.ConcurrentTimingsMs {
            _, err = tx.Stmt(insertTiming).Exec(
                runId,
                i,
                timing,
            )
            if err != nil {
                log.Fatalln(err)
            }
        }
    
        err = tx.Commit()
    
        if err != nil {
            log.Fatalln(err)
        }
    }
    func persistSequentialTimings(runId int, timings timingResult, db *sql.DB) {
        tx, err := db.Begin()
        if err != nil {
            log.Fatalln(err)
        }
    
        insertTiming, err := db.Prepare(`INSERT INTO sequential_timings(
            run, 
            call_number, 
            timing_ms) 
            VALUES(?, ?, ?)`)
    
        if err != nil {
            log.Fatalln(err)
        }
        for i, timing := range timings.SequentialTimingsMs {
            _, err = tx.Stmt(insertTiming).Exec(
                runId,
                i,
                timing,
            )
            if err != nil {
                log.Fatalln(err)
            }
        }
    
        err = tx.Commit()
    
        if err != nil {
            log.Fatalln(err)
        }
    }