I tried to insert Item by using goroutines ( 2000 goroutines)
so run below code
package main
import (
"fmt"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"log"
"sync"
)
type Item struct {
Id int `db:"id"`
Title string `db:"title"`
Description string `db:"description"`
}
// ConnectPostgresDB -> connect postgres db
func ConnectPostgresDB() *sqlx.DB {
connstring := "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080"
db, err := sqlx.Open("postgres", connstring)
if err != nil {
fmt.Println(err)
return db
}
return db
}
func InsertItem(item Item, wg *sync.WaitGroup) {
defer wg.Done()
db := ConnectPostgresDB()
defer db.Close()
tx, err := db.Beginx()
if err != nil {
fmt.Println(err)
return
}
_, err = tx.Queryx("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
if err != nil {
fmt.Println(err)
}
err = tx.Commit()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Data is Successfully inserted!!")
}
func main() {
var wg sync.WaitGroup
//db, err := sqlx.Connect("postgres", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080")
for i := 1; i <= 2000; i++ {
item := Item{Id: i, Title: "TestBook", Description: "TestDescription"}
//go GetItem(db, i, &wg)
wg.Add(1)
go InsertItem(item, &wg)
}
wg.Wait()
fmt.Println("All DB Connection is Completed")
}
after run above code , I think 2000'th rows in items table
But in items table , there are only 150'rows exists
too many clients Error in db server so i increased max_connections 100 -> 4000
and then tried again
But Results is same
Does anybody know why these results occurs???
Note
All the code for the answer below is available under https://github.com/mwmahlberg/so-postgres.
The problem is that you do not really seem to note the cause of the error.
With a slightly adjusted main.go
, the problem should become rather obvious:
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
const schema = `
CREATE TABLE IF NOT EXISTS items (
id integer primary key,
title text,
description text
);
`
type Item struct {
Id int `db:"id"`
Title string `db:"title"`
Description string `db:"description"`
}
var (
// Make the waitgroup global: Easier to use and less error-prone
wg sync.WaitGroup
// Make the database URL a configurable flag
dburl string
)
func init() {
// Make the database URL a configurable flag
flag.StringVar(&dburl, "dburl", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=5432", "Postgres DB URL")
}
// handlePanics is a simple function to log the error that caused a panic and exit the program
func handlePanics() {
if r := recover(); r != nil {
log.Println("encountered panic: ", r)
os.Exit(1)
}
}
// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
defer wg.Done()
// With the beginning of the transaction, a connection is acquired from the pool
tx, err := db.Beginx()
if err != nil {
panic(fmt.Errorf("beginning transaction: %s", err))
}
_, err = tx.Exec("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
if err != nil {
// the rollback is rather superfluous here
// but it's good practice to include it
tx.Rollback()
// panic will cause the goroutine to exit and the waitgroup to decrement
// Also, the handlePanics function will catch the panic and log the error
panic(fmt.Errorf("inserting data %#v: %s", item, err))
}
err = tx.Commit()
if err != nil {
panic(fmt.Errorf("committing transaction: %s", err))
}
log.Printf("Inserted item with id %d\n", item.Id)
}
func main() {
// Recover from panics and log the error for the main goroutine
defer handlePanics()
flag.Parse()
log.Printf("DB URL: %s\n", dburl)
var (
db *sqlx.DB
err error
)
// Only open one connection to the database.
// The postgres driver will open a pool of connections for you.
if db, err = sqlx.Connect("postgres", dburl); err != nil {
log.Fatalln(err)
}
defer db.Close()
// Create the items table
// Note that if this panics, the handlePanics function will catch it and log the error
db.MustExec(schema)
// maxOpen := db.Stats().MaxOpenConnections
// var mutex sync.Mutex
start := time.Now()
for i := 1; i <= 2000; i++ {
wg.Add(1)
go func(i int) {
// For goroutines, you must explicitly set the panic handler
defer handlePanics()
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("All DB Inserts completed after %s\n", elapsed)
}
And indeed the application logs an error in my test setup:
2024/02/25 16:41:27 encountered panic: beginning transaction: pq: sorry, too many clients already
So, we need to add a control for that:
// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)
// use the actual value
maxOpen := db.DB.Stats().MaxOpenConnections
var mutex sync.Mutex
for i := 1; i <= 2000; i++ {
wg.Add(1)
// For goroutines, you must explicitly set the panic handler
go func(i int) {
defer handlePanics()
// use a label to ensure that the goroutine breaks out of inner loop
waitForOpenConnection:
for {
// Lock the mutex to check the number of open connections.
// We need to do this otherwise another goroutine could increment the number of open connections
mutex.Lock()
// Get the connections in the pool that are currently in use
switch open := db.DB.Stats().InUse; {
// If the number of open connections is less than the maximum, insert the item
case open <= maxOpen:
log.Println("Inserting item")
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
// Now that the item has been inserted, unlock the mutex and break out of the inner loop
mutex.Unlock()
break waitForOpenConnection
default:
// Allow other goroutines to read the number of open connections
mutex.Unlock()
}
}
}(i)
}
And sure enough, the result is as expected:
All DB Inserts completed after 514.022334ms
Quite some hassle for something so (deceivingly) simple, right?
Now here comes the REALLY troubling part:
If we have a look at the simplified version (full code in the aptly named "simplified" branch):
package main
...
const (
schema = `
CREATE TABLE IF NOT EXISTS items (
id integer primary key,
title text,
description text
);
`
insert = `
INSERT INTO items(id, title, description) VALUES($1, $2, $3)
`
)
...
// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
var (
tx *sqlx.Tx
err error
)
// With the beginning of the transaction, a connection is acquired from the pool
if tx, err = db.Beginx(); err != nil {
panic(fmt.Errorf("beginning transaction: %s", err))
}
if _, err = tx.Exec(insert, item.Id, item.Title, item.Description); err != nil {
// the rollback is rather superfluous here
// but it's good practice to include it
tx.Rollback()
// panic will cause the goroutine to exit and the waitgroup to decrement
// Also, the handlePanics function will catch the panic and log the error
panic(fmt.Errorf("inserting data: %s", err))
}
if err = tx.Commit(); err != nil {
panic(fmt.Errorf("committing transaction: %s", err))
}
}
func main() {
// Recover from panics and log the error for the main goroutine
defer handlePanics()
flag.Parse()
log.Printf("DB URL: %s\n", dburl)
var (
db *sqlx.DB
err error
)
// Only open one connection to the database.
// The postgres driver will open a pool of connections for you.
if db, err = sqlx.Connect("postgres", dburl); err != nil {
log.Fatalln(err)
}
defer db.Close()
// Create the items table
// Note that if this panics, the handlePanics function will catch it and log the error
db.MustExec(schema)
start := time.Now()
// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)
for i := 1; i <= 2000; i++ {
// use a label to ensure that the goroutine breaks out of inner loop
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
}
log.Printf("All DB Inserts completed after %s\n", time.Since(start))
}
Not only is main
much more readable and a lot less complicated - it is of comparable speed. Actually, in my totally non-scientific tests all runs were on average 100ms faster than with the goroutine hassle.
To make a very long story short: Conventional wisdom has it that premature optimization is the root of all evil. And concurrency is not parallelism.
Do not use goroutines unless you have to, check your errors and react to them.