Firstly, I have seen this question and understand the takeaways from the points made.
However I'm unsure how I can improve my current situation.
I have a placeholder entry in my DynamoDB that will be updated when I receive data from an SQS queue that itself receives sporadic data processed via an external API.
The data in my table looks something like:
{
name: "ID",
type: "S",
},
{
name: "RESULTS",
type: "L"
}
With the ID
being the key
and the RESULTS
being an array of result object that are sent in.
So an example would be
{
ID: "ID123",
RESULTS: [{resultId: 1, result: 'fail'}, {resultId: 2, result: 'pass'}]
The API that fires results for me to persist can fire quickly and concurrently, so I am needing to handle any race conditions.
Currently my plan is to:
RESULTS
dataThe issue I'm thinking of is, how do I handle 2 sets of results coming in at the same time? If this were to happen, both requests would:
RESULTS
data from the DB// This is within a Lambda handler, just FYI
return await Promise.all(
// event and Records refers to the SQS message coming through
event?.Records?.map(async record => {
const { body }: SQSRecord = record;
const { id, data: { resultsFromApi } } = JSON.parse(body);
const dbData = // fetch results from DB by ID
const {RESULTS: { L: RESULTS }} = dbData?.Item;
// Combine results
const combonedResults = [...RESULTS, ...resultsFromApi]
// store new results in DB alongside ID
storeInDb(id, combonedResults);
}),
)
How can one implement concurrency to protect from the race conditions and overwrites?
FYI - I'm currently using the PutItemCommand
rather than using the update
variant just as I'm not too familiar with working on Dynamo - So if that will cause an issue also, let me know.
Let me tell you how I would do it, and then I'll explain how you could do it with your current design if you have an additional requirement that you didn't mention.
By far the best approach here is to simply update the item, without reading it.
result = table.update_item(
Key={
'ID': hash_key
},
UpdateExpression="SET RESULTS = list_append(RESULTS, :i)",
ExpressionAttributeValues={
':i': <YOUR LIST OF RESULTS>,
},
ReturnValues="UPDATED_NEW"
)
With this approach all updates to the item will be atomic, and will not overwrite. ReturnValues
gives you a free read of them item, should you need it.
You mentioned you read the item first, if that something that is necessary because you have some business logic that forces you to, then you can implement optimistic locking.
Optimistic locking enforces a version number, which you place a condition on while updating an item. Again, this method is nowhere near as efficient as the previous but it's an option that may suit your needs. You can read more here: