Search code examples
sql-servergoaws-lambdagoroutine

Losing data with the goroutines


I am writing an AWS lambda code to query an RDS table, Convert it to JSON, and return it. But I don't see all the records in the JSON than what SQL query has returned. Say I am querying 1500 records from the table but there are only 1496 to 1500 records in the JSON (0-5 records less) every time. I doubt I have messed up something with the sync.WaitGroup.

Below is the SQL Server Query

SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0

Below is my code

// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {

    receiver := make([]string, totalColumns)

    is := make([]interface{}, len(receiver))
    for i := range is {
        is[i] = &receiver[i]
    }

    err := rows.Scan(is...)

    if err != nil {
        fmt.Println("Error reading rows: " + err.Error())
    }

    TotalRecordsInParseRowfunction++
    return receiver
}

// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {

    // Query Table
    rows, err := conn.Query(query)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    println("Rows:", rows)
    defer rows.Close()

    // Get the column names
    columns, err := rows.Columns()
    // fmt.Println("columns", columns)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    totalColumns := len(columns)
    var resp []map[string]string // Declare the type of final response which will be used to create JSON
    var waitgroup sync.WaitGroup

    // Iterate over all the rows returned
    for rows.Next() {

        waitgroup.Add(1)
        TotalRecordsCount++
        row := parseRow(rows, totalColumns)

        go func() {
            // Create a map of the row
            respRow := map[string]string{} // Declare the type of each row of response
            for count := range row {
                respRow[columns[count]] = row[count]
            }
            // fmt.Println("\n\nrespRow", respRow)

            resp = append(resp, respRow)
            TotalRecordsAppendedCount++
            waitgroup.Done()
        }()
    }
    waitgroup.Wait()

    // If no rows are returned
    if len(resp) == 0 {
        fmt.Println("MESSAGE: No records are available")
        return "", errors.New("MESSAGE: No records are available")
    }

    // Create JSON
    respJSON, _ := json.Marshal(resp)
    fmt.Println("Response", string(respJSON))

    fmt.Println("\n--------------Summary---------------")
    fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
    fmt.Println("TotalRecordsCount", TotalRecordsCount)
    fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
    fmt.Println("Object Length", len(resp))

    return string(respJSON), nil // Return JSON

}

Below is the output summary I am getting

--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496

Solution

  • Your code is racy. Multiple goroutines are writing to resp without any mutual exclusion, so you lose data.

    You may add a mutex lock-unlock around that. However, the code you have in the goroutine does not warrant its own goroutine because it is a simple map addition. Dealing with that code in the goroutine will be a lot easier and probably it'll run much faster without the goroutine scheduling overhead. Unless you are planning to have more logic in that goroutine, I suggest you remove it.

    Here's some more information about what's probably going on: first of all, in the current version of go, a goroutine will only yield to others when that goroutine calls some of the library functions. Looking at the code, it is unlikely that your goroutines will ever yield. Since you already observed data loss (which means there is a race condition), you probably have more than one cores.

    The race is here:

    resp = append(resp, respRow)
    

    Without mutual exclusion, one goroutine may look at resp, sees that it can write to its nth element. Another goroutine (running on a separate core) can do the same, and writes there successfully. But the first goroutine still thinks that element is empty, so overwrites it, and updates resp. When this happens, you lose one element.

    If you add mutual exclusion to this code, you'll essentially force all goroutines to run sequentially because they are not really doing anything else. Also, since goroutine execution ordering is random, you'll end up with a randomly ordered resp. In short, this is one of those instances where you should execute the code sequentially.