Search code examples
goamazon-dynamodb

read tcp read: connection reset by peer


I've been using the Golang DynamoDB SDK for a while now, and recently I started seeing this error type come back:

RequestError: send request failed caused by: Post "https://dynamodb.[REGION].amazonaws.com/": read tcp [My IP]->[AWS IP]: read: connection reset by peer

This only seems to occur when writing large amounts of data to DynamoDB, although the error is not limited to any particular type of request. I've seen it in both UpdateItem and BatchWriteItem requests. Furthermore, as the failure isn't consistent, I can't localize it to a particular line of code. It seems that the error is related to some sort of network issue between my service and AWS but, as it doesn't come back as a throttling exception, I'm not sure how to debug it. Finally, as the response comes back from a write request, I don't think retry logic is really the solution here either.

Here's my batch-write code:

func (conn *Connection) BatchWrite(tableName string, requests []*dynamodb.WriteRequest) error {

    // Get the length of the requests; if there aren't any then return because there's nothing to do
    length := len(requests)
    log.Printf("Attempting to write %d items to DynamoDB", length)
    if length == 0 {
        return nil
    }

    // Get the number of requests to make
    numRequests := length / 25
    if length%25 != 0 {
        numRequests++
    }

    // Create the variables necessary to manage the concurrency
    var wg sync.WaitGroup
    errs := make(chan error, numRequests)

    // Attempt to batch-write the requests to DynamoDB; because DynamoDB limits the number of concurrent
    // items in a batch request to 25, we'll chunk the requests into 25-report segments
    sections := make([][]*dynamodb.WriteRequest, numRequests)
    for i := 0; i < numRequests; i++ {

        // Get the end index which is 25 greater than the current index or the end of the array
        // if we're getting close
        end := (i + 1) * 25
        if end > length {
            end = length
        }

        // Add to the wait group so that we can ensure all the concurrent processes finish
        // before we close down the process
        wg.Add(1)

        // Write the chunk to DynamoDB concurrently
        go func(wg *sync.WaitGroup, index int, start int, end int) {
            defer wg.Done()

            // Call the DynamoDB operation; record any errors that occur
            if section, err := conn.batchWriteInner(tableName, requests[start:end]); err != nil {
                errs <- err
            } else {
                sections[index] = section
            }
        }(&wg, i, i*25, end)
    }

    // Wait for all the goroutines to finish
    wg.Wait()

    // Attempt to read an error from the channel; if we get one then return it
    // Otherwise, continue. We have to use the select here because this is
    // the only way to attempt to read from a channel without it blocking
    select {
    case err, ok := <-errs:
        if ok {
            return err
        }
    default:
        break
    }

    // Now, we've probably gotten retries back so take these and combine them into
    // a single list of requests
    retries := sections[0]
    if len(sections) > 1 {
        for _, section := range sections[1:] {
            retries = append(retries, section...)
        }
    }

    // Rewrite the requests and return the result
    return conn.BatchWrite(tableName, retries)
}

func (conn *Connection) batchWriteInner(tableName string, requests []*dynamodb.WriteRequest) ([]*dynamodb.WriteRequest, error) {

    // Create the request
    request := dynamodb.BatchWriteItemInput{
        ReturnConsumedCapacity:      aws.String(dynamodb.ReturnConsumedCapacityNone),
        ReturnItemCollectionMetrics: aws.String(dynamodb.ReturnItemCollectionMetricsNone),
        RequestItems: map[string][]*dynamodb.WriteRequest{
            tableName: requests,
        },
    }

    // Attempt to batch-write the items with an exponential backoff
    var result *dynamodb.BatchWriteItemOutput
    err := backoff.Retry(func() error {

        // Attempt the batch-write; if it fails then back-off and wait. Otherwise break out
        // of the loop and return
        var err error
        if result, err = conn.inner.BatchWriteItem(&request); err != nil {

            // If we have an error then what we do here will depend on the error code
            // If the error code is for exceeded throughput, exceeded request limit or
            // an internal server error then we'll try again. Otherwise, we'll break out
            // because the error isn't recoverable
            if aerr, ok := err.(awserr.Error); ok {
                switch aerr.Code() {
                case dynamodb.ErrCodeProvisionedThroughputExceededException:
                case dynamodb.ErrCodeRequestLimitExceeded:
                case dynamodb.ErrCodeInternalServerError:
                    return err
                }
            }

            // We received an error that won't be fixed by backing off; return this as a permanent
            // error so we can tell the backoff library that we want to break out of the exponential backoff
            return backoff.Permanent(err)
        }

        return nil
    }, backoff.NewExponentialBackOff())

    // If the batch-write failed then return an error
    if err != nil {
        return nil, err
    }

    // Roll the unprocessed items into a single list and return them
    var list []*dynamodb.WriteRequest
    for _, item := range result.UnprocessedItems {
        list = append(list, item...)
    }

    return list, nil
}

Has anyone else dealt with this issue before? What's the correct approach here?


Solution

  • I faced similar issue, and I found this question has been answered in this AWS SDK GitHub thread: https://github.com/aws/aws-sdk-go/issues/5037

    To summarise:

    • The Error originates directly from the networking layer.
    • It's not reproducible. You can enable logging to make sure that retries are taking effect and that should be good enough