Search code examples
goaws-lambdadynamodb-queriesamazon-dynamodb-index

Executing DynamoDB queries in parallel (BatchGetItems for Global Secondary Index)


The idea here is run multiple DynamoDB queries in parallel as the query is run over a GSI. As of now BatchGetItems doesn't support querying over Indexes and the recommended approach is to Query the data in parallel. I'm using go routines with wg to take care of the executions of the routines in parallel.

The input to the function is an array of strings with an ID, the outputs are the attributes of the Ids.

When the function is run locally, there is no issue, however, when the function is run on AWS-Lambda, the returned data keeps growing;

ie; Input 2 items should output 2 items. If the function is tested on AWS-Lambda,

  • 1st time the function returns 2 item
  • 2nd time it returns 4 items (same items are repeated 2 times)
  • 3rd time it returns 6 items (same items are repeated 4 times)

and so on. Here is a snippet of the code. Is there something not handled correctly that's having the lambda output the extra set of data every-time the lambda is run?

package main

import (
    "context"
    "fmt"
    "os"
    "sync"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)

//Final Output Interface
var bulkOutput []interface{}

func exitWithError(err error) {
    fmt.Fprintln(os.Stderr, err)
    os.Exit(1)
}

//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
    Ids      []string `json:"ids,omitempty"`
}

//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
    return DynamoDBBatchGetRecords(data), nil
}

func main() {
    lambda.Start(HandleRequest)
}

func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

    var wg sync.WaitGroup
    var mutex = &sync.Mutex{}

    iterations := len(a.Ids)
    wg.Add(iterations)
    for i := 0; i < iterations; i++ {
        go QueryOutput(a.Ids[i], &wg, mutex)
    }

    wg.Wait()
    return bulkOutput

}

//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
    var outputData []interface{}
    defer wg.Done()
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("aws-region"),
    })
    if err != nil {
        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
    }
    ddb := dynamodb.New(sess)
    queryInput := &dynamodb.QueryInput{
        Limit:                aws.Int64(1),
        TableName:            aws.String("table-name"),
        IndexName:            aws.String("gsi-index"),
        ScanIndexForward:     aws.Bool(false),
        ConsistentRead:       aws.Bool(false),
        KeyConditions: map[string]*dynamodb.Condition{
            "column_name": {
                ComparisonOperator: aws.String("EQ"),
                AttributeValueList: []*dynamodb.AttributeValue{
                    {
                        S: aws.String(data),
                    },
                },
            },
        },
    }
    output, err := ddb.Query(queryInput)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
    }
    err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
    }
    mtx.Lock()
    bulkOutput = append(bulkOutput, outputData)
    mtx.Unlock()
}

Solution

  • According to documentation, global variables are independent of your Lambda function's handler code. This was causing the buffer to build up over time.

    Rectified reference pasted below.

    package main
    
    import (
        "context"
        "fmt"
        "os"
        "sync"
    
        "github.com/aws/aws-lambda-go/lambda"
        "github.com/aws/aws-sdk-go/aws"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/dynamodb"
        "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
    )
    
    func exitWithError(err error) {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
    
    //HandleRequest : Lambda entry point
    func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
        output := DynamoDBBatchGetRecords(data)
        return output, nil
    }
    
    func main() {
        lambda.Start(HandleRequest)
    }
    
    func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
        var dataOut []interface{}
        var wg = &sync.WaitGroup{}
        var mtx = &sync.Mutex{}
    
        iterations := len(a.Ids)
        wg.Add(iterations)
        for i := 0; i < i; i++ {
            go func(i int) {
                defer wg.Done()
                var outputData []interface{}
                sess, err := session.NewSession(&aws.Config{
                    Region: aws.String("aws-region"),
                })
                if err != nil {
                    exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
                }
                ddb := dynamodb.New(sess)
                queryInput := &dynamodb.QueryInput{
                    Limit:            aws.Int64(1),
                    TableName:        aws.String("table"),
                    IndexName:        aws.String("index"),
                    ScanIndexForward: aws.Bool(false),
                    ConsistentRead: aws.Bool(false),
                    KeyConditions: map[string]*dynamodb.Condition{
                        "index-column": {
                            ComparisonOperator: aws.String("EQ"),
                            AttributeValueList: []*dynamodb.AttributeValue{
                                {
                                    S: aws.String(a.Ids[i]),
                                },
                            },
                        },
                    },
                }
                output, err := ddb.Query(queryInput)
    
                if err != nil {
                    exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
                }
                err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
                if err != nil {
                    exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
                }
    
                mtx.Lock()
                dataOut = append(dataOut, outputData[0])
                mtx.Unlock()
    
            }(i)
        }
        wg.Wait()
        return dataOut
    }