Search code examples
postgresqlgogoroutine

Insert Item by using goroutine does not work properly


I tried to insert Item by using goroutines ( 2000 goroutines)

so run below code


package main

import (
    "fmt"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "log"
    "sync"
)

type Item struct {
    Id          int    `db:"id"`
    Title       string `db:"title"`
    Description string `db:"description"`
}

// ConnectPostgresDB -> connect postgres db
func ConnectPostgresDB() *sqlx.DB {
    connstring := "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080"
    db, err := sqlx.Open("postgres", connstring)
    if err != nil {
        fmt.Println(err)
        return db
    }
    return db
}

func InsertItem(item Item, wg *sync.WaitGroup) {
    defer wg.Done()
    db := ConnectPostgresDB()
    defer db.Close()
    tx, err := db.Beginx()
    if err != nil {
        fmt.Println(err)
        return
    }

    _, err = tx.Queryx("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
    if err != nil {
        fmt.Println(err)
    }

    err = tx.Commit()
    if err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("Data is Successfully inserted!!")
}





func main() {
    var wg sync.WaitGroup
    //db, err := sqlx.Connect("postgres", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080")
    for i := 1; i <= 2000; i++ {
        item := Item{Id: i, Title: "TestBook", Description: "TestDescription"}
        //go GetItem(db, i, &wg)
        wg.Add(1)
        go InsertItem(item, &wg)

    }
    wg.Wait()
    fmt.Println("All DB Connection is Completed")
}

after run above code , I think 2000'th rows in items table

But in items table , there are only 150'rows exists

too many clients Error in db server so i increased max_connections 100 -> 4000

and then tried again

But Results is same

Does anybody know why these results occurs???


Solution

  • Note

    All the code for the answer below is available under https://github.com/mwmahlberg/so-postgres.

    The problem is that you do not really seem to note the cause of the error.

    With a slightly adjusted main.go, the problem should become rather obvious:

    package main
    
    import (
        "flag"
        "fmt"
        "log"
        "os"
        "sync"
        "time"
    
        "github.com/jmoiron/sqlx"
        _ "github.com/lib/pq"
    )
    
    const schema = `
    CREATE TABLE IF NOT EXISTS items (
            id integer primary key,
            title text,
            description text
    );
    `
    
    type Item struct {
        Id          int    `db:"id"`
        Title       string `db:"title"`
        Description string `db:"description"`
    }
    
    var (
        // Make the waitgroup global: Easier to use and less error-prone
        wg sync.WaitGroup
    
        // Make the database URL a configurable flag
        dburl string
    )
    
    func init() {
        // Make the database URL a configurable flag
        flag.StringVar(&dburl, "dburl", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=5432", "Postgres DB URL")
    }
    
    // handlePanics is a simple function to log the error that caused a panic and exit the program
    func handlePanics() {
        if r := recover(); r != nil {
            log.Println("encountered panic: ", r)
            os.Exit(1)
        }
    }
    
    // InsertItem inserts an item into the database.
    // Note that the db is passed as an argument.
    func InsertItem(item Item, db *sqlx.DB) {
        defer wg.Done()
        // With the beginning of the transaction, a connection is acquired from the pool
        tx, err := db.Beginx()
        if err != nil {
            panic(fmt.Errorf("beginning transaction: %s", err))
        }
    
        _, err = tx.Exec("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
        if err != nil {
            // the rollback is rather superfluous here
            // but it's good practice to include it
            tx.Rollback()
    
            // panic will cause the goroutine to exit and the waitgroup to decrement
            // Also, the handlePanics function will catch the panic and log the error
            panic(fmt.Errorf("inserting data %#v: %s", item, err))
        }
    
        err = tx.Commit()
        if err != nil {
            panic(fmt.Errorf("committing transaction: %s", err))
        }
    
        log.Printf("Inserted item with id %d\n", item.Id)
    }
    
    func main() {
    
        // Recover from panics and log the error for the main goroutine
        defer handlePanics()
    
        flag.Parse()
        log.Printf("DB URL: %s\n", dburl)
    
        var (
            db  *sqlx.DB
            err error
        )
    
        // Only open one connection to the database.
        // The postgres driver will open a pool of connections for you.
        if db, err = sqlx.Connect("postgres", dburl); err != nil {
            log.Fatalln(err)
        }
        defer db.Close()
    
        // Create the items table
        // Note that if this panics, the handlePanics function will catch it and log the error
        db.MustExec(schema)
    
        // maxOpen := db.Stats().MaxOpenConnections
        // var mutex sync.Mutex
        start := time.Now()
        for i := 1; i <= 2000; i++ {
    
            wg.Add(1)
    
            go func(i int) {
                // For goroutines, you must explicitly set the panic handler
                defer handlePanics()
                InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
            }(i)
        }
        wg.Wait()
        elapsed := time.Since(start)
        fmt.Printf("All DB Inserts completed after %s\n", elapsed)
    }
    

    And indeed the application logs an error in my test setup:

    2024/02/25 16:41:27 encountered panic:  beginning transaction: pq: sorry, too many clients already
    

    So, we need to add a control for that:

    
    // Set the number of connections in the pool
    db.DB.SetMaxOpenConns(10)
    
    // use the actual value
    maxOpen := db.DB.Stats().MaxOpenConnections
    
    var mutex sync.Mutex
    for i := 1; i <= 2000; i++ {
    
        wg.Add(1)
    
        // For goroutines, you must explicitly set the panic handler
        go func(i int) {
    
            defer handlePanics()
    
            // use a label to ensure that the goroutine breaks out of inner loop
        waitForOpenConnection:
            for {
                // Lock the mutex to check the number of open connections.
                // We need to do this otherwise another goroutine could increment the number of open connections
                mutex.Lock()
    
                // Get the connections in the pool that are currently in use
                switch open := db.DB.Stats().InUse; {
    
                // If the number of open connections is less than the maximum, insert the item
                case open <= maxOpen:
                    log.Println("Inserting item")
                    InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
                    // Now that the item has been inserted, unlock the mutex and break out of the inner loop
                    mutex.Unlock()
                    break waitForOpenConnection
                default:
                    // Allow other goroutines to read the number of open connections
                    mutex.Unlock()
                }
            }
        }(i)
    }
    

    And sure enough, the result is as expected:

    All DB Inserts completed after 514.022334ms
    

    Quite some hassle for something so (deceivingly) simple, right?

    Now here comes the REALLY troubling part:

    "Concurrency is not parallelism."

    Go proverb

    If we have a look at the simplified version (full code in the aptly named "simplified" branch):

    package main
    
    ...
    
    const (
        schema = `
    CREATE TABLE IF NOT EXISTS items (
            id integer primary key,
            title text,
            description text
    );
    `
        insert = `
    INSERT INTO items(id, title, description) VALUES($1, $2, $3)
    `
    )
    
    ...
    
    // InsertItem inserts an item into the database.
    // Note that the db is passed as an argument.
    func InsertItem(item Item, db *sqlx.DB) {
    
        var (
            tx  *sqlx.Tx
            err error
        )
    
        // With the beginning of the transaction, a connection is acquired from the pool
        if tx, err = db.Beginx(); err != nil {
            panic(fmt.Errorf("beginning transaction: %s", err))
        }
    
        if _, err = tx.Exec(insert, item.Id, item.Title, item.Description); err != nil {
            // the rollback is rather superfluous here
            // but it's good practice to include it
            tx.Rollback()
    
            // panic will cause the goroutine to exit and the waitgroup to decrement
            // Also, the handlePanics function will catch the panic and log the error
            panic(fmt.Errorf("inserting data: %s", err))
        }
    
        if err = tx.Commit(); err != nil {
            panic(fmt.Errorf("committing transaction: %s", err))
        }
    
    }
    
    func main() {
    
        // Recover from panics and log the error for the main goroutine
        defer handlePanics()
    
        flag.Parse()
        log.Printf("DB URL: %s\n", dburl)
    
        var (
            db  *sqlx.DB
            err error
        )
    
        // Only open one connection to the database.
        // The postgres driver will open a pool of connections for you.
        if db, err = sqlx.Connect("postgres", dburl); err != nil {
            log.Fatalln(err)
        }
        defer db.Close()
    
        // Create the items table
        // Note that if this panics, the handlePanics function will catch it and log the error
        db.MustExec(schema)
        start := time.Now()
    
        // Set the number of connections in the pool
        db.DB.SetMaxOpenConns(10)
    
        for i := 1; i <= 2000; i++ {
            // use a label to ensure that the goroutine breaks out of inner loop
            InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
        }
        log.Printf("All DB Inserts completed after %s\n", time.Since(start))
    }
    

    Not only is main much more readable and a lot less complicated - it is of comparable speed. Actually, in my totally non-scientific tests all runs were on average 100ms faster than with the goroutine hassle.

    Summary

    To make a very long story short: Conventional wisdom has it that premature optimization is the root of all evil. And concurrency is not parallelism.

    Do not use goroutines unless you have to, check your errors and react to them.