I've been using the Golang DynamoDB SDK for a while now, and recently I started seeing this error type come back:
RequestError: send request failed caused by: Post "https://dynamodb.[REGION].amazonaws.com/": read tcp [My IP]->[AWS IP]: read: connection reset by peer
This only seems to occur when writing large amounts of data to DynamoDB, although the error is not limited to any particular type of request. I've seen it in both UpdateItem
and BatchWriteItem
requests. Furthermore, as the failure isn't consistent, I can't localize it to a particular line of code. It seems that the error is related to some sort of network issue between my service and AWS but, as it doesn't come back as a throttling exception, I'm not sure how to debug it. Finally, as the response comes back from a write request, I don't think retry logic is really the solution here either.
Here's my batch-write code:
func (conn *Connection) BatchWrite(tableName string, requests []*dynamodb.WriteRequest) error {
// Get the length of the requests; if there aren't any then return because there's nothing to do
length := len(requests)
log.Printf("Attempting to write %d items to DynamoDB", length)
if length == 0 {
return nil
}
// Get the number of requests to make
numRequests := length / 25
if length%25 != 0 {
numRequests++
}
// Create the variables necessary to manage the concurrency
var wg sync.WaitGroup
errs := make(chan error, numRequests)
// Attempt to batch-write the requests to DynamoDB; because DynamoDB limits the number of concurrent
// items in a batch request to 25, we'll chunk the requests into 25-report segments
sections := make([][]*dynamodb.WriteRequest, numRequests)
for i := 0; i < numRequests; i++ {
// Get the end index which is 25 greater than the current index or the end of the array
// if we're getting close
end := (i + 1) * 25
if end > length {
end = length
}
// Add to the wait group so that we can ensure all the concurrent processes finish
// before we close down the process
wg.Add(1)
// Write the chunk to DynamoDB concurrently
go func(wg *sync.WaitGroup, index int, start int, end int) {
defer wg.Done()
// Call the DynamoDB operation; record any errors that occur
if section, err := conn.batchWriteInner(tableName, requests[start:end]); err != nil {
errs <- err
} else {
sections[index] = section
}
}(&wg, i, i*25, end)
}
// Wait for all the goroutines to finish
wg.Wait()
// Attempt to read an error from the channel; if we get one then return it
// Otherwise, continue. We have to use the select here because this is
// the only way to attempt to read from a channel without it blocking
select {
case err, ok := <-errs:
if ok {
return err
}
default:
break
}
// Now, we've probably gotten retries back so take these and combine them into
// a single list of requests
retries := sections[0]
if len(sections) > 1 {
for _, section := range sections[1:] {
retries = append(retries, section...)
}
}
// Rewrite the requests and return the result
return conn.BatchWrite(tableName, retries)
}
func (conn *Connection) batchWriteInner(tableName string, requests []*dynamodb.WriteRequest) ([]*dynamodb.WriteRequest, error) {
// Create the request
request := dynamodb.BatchWriteItemInput{
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityNone),
ReturnItemCollectionMetrics: aws.String(dynamodb.ReturnItemCollectionMetricsNone),
RequestItems: map[string][]*dynamodb.WriteRequest{
tableName: requests,
},
}
// Attempt to batch-write the items with an exponential backoff
var result *dynamodb.BatchWriteItemOutput
err := backoff.Retry(func() error {
// Attempt the batch-write; if it fails then back-off and wait. Otherwise break out
// of the loop and return
var err error
if result, err = conn.inner.BatchWriteItem(&request); err != nil {
// If we have an error then what we do here will depend on the error code
// If the error code is for exceeded throughput, exceeded request limit or
// an internal server error then we'll try again. Otherwise, we'll break out
// because the error isn't recoverable
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
case dynamodb.ErrCodeRequestLimitExceeded:
case dynamodb.ErrCodeInternalServerError:
return err
}
}
// We received an error that won't be fixed by backing off; return this as a permanent
// error so we can tell the backoff library that we want to break out of the exponential backoff
return backoff.Permanent(err)
}
return nil
}, backoff.NewExponentialBackOff())
// If the batch-write failed then return an error
if err != nil {
return nil, err
}
// Roll the unprocessed items into a single list and return them
var list []*dynamodb.WriteRequest
for _, item := range result.UnprocessedItems {
list = append(list, item...)
}
return list, nil
}
Has anyone else dealt with this issue before? What's the correct approach here?
I faced similar issue, and I found this question has been answered in this AWS SDK GitHub thread: https://github.com/aws/aws-sdk-go/issues/5037
To summarise: