I have a Cloudformation template in which DynamoDB events are pushed to Eventbridge via DynamoDB streams and an inline Lambda function
The Cloudformation table stuff looks like this -
"MyTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"AttributeDefinitions": [
{
"AttributeName": "pk",
"AttributeType": "S"
},
{
"AttributeName": "sk",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST",
"KeySchema": [
{
"AttributeName": "pk",
"KeyType": "HASH"
},
{
"AttributeName": "sk",
"KeyType": "RANGE"
}
],
"GlobalSecondaryIndexes": [],
"StreamSpecification": {
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
}
},
"MyTableMapping": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": {
"Ref": "MyTableStreamingFunction"
},
"StartingPosition": "LATEST",
"MaximumBatchingWindowInSeconds": 1,
"EventSourceArn": {
"Fn::GetAtt": [
"MyTable",
"StreamArn"
]
},
"MaximumRetryAttempts": 3
}
}
And the Lambda code for MyTableStreamingFunction
looks like this -
import boto3, json, math, os
class Key:
def __init__(self, pk, sk, eventname, diffkeys):
self.pk=pk
self.sk=sk
self.eventname=eventname
self.diffkeys=diffkeys
def __str__(self):
return "%s/%s/%s/%s" % (self.pk,
self.sk,
self.eventname,
"|".join(self.diffkeys))
class Entry:
def __init__(self, key, records, context):
self.key=key
self.records=records
self.context=context
@property
def entry(self):
detail={"pk": self.key.pk,
"sk": self.key.sk,
"eventName": self.key.eventname,
"diffKeys": self.key.diffkeys,
"records": self.records}
source=self.context.function_name
detailtype=self.key.eventname
return {"Source": source,
"DetailType": detailtype,
"Detail": json.dumps(detail)}
def batch_records(records):
def diff_keys(record):
if not ("NewImage" in record["dynamodb"] and
"OldImage" in record["dynamodb"]):
return []
newimage={k: list(v.values())[0]
for k, v in record["dynamodb"]["NewImage"].items()}
oldimage={k: list(v.values())[0]
for k, v in record["dynamodb"]["OldImage"].items()}
diffkeys=[]
for k in newimage:
if (k not in oldimage or
newimage[k]!=oldimage[k]):
diffkeys.append(k)
return sorted(diffkeys) # NB sort
keys, groups = {}, {}
for record in records:
pk=record["dynamodb"]["Keys"]["pk"]["S"]
sk=record["dynamodb"]["Keys"]["sk"]["S"].split("#")[0]
eventname=record["eventName"]
diffkeys=diff_keys(record)
key=Key(pk=pk,
sk=sk,
eventname=eventname,
diffkeys=diffkeys)
strkey=str(key)
if strkey not in keys:
keys[strkey]=key
groups.setdefault(strkey, [])
groups[strkey].append(record)
return [(key, groups[strkey])
for strkey, key in keys.items()]
def handler(event, context,
batchsize=os.environ["BATCH_SIZE"],
debug=os.environ["DEBUG"]):
batchsize=int(batchsize)
debug=eval(debug.lower().capitalize())
events=boto3.client("events")
if debug:
print ("--- records ---")
print (json.dumps(event["Records"]))
groups=batch_records(event["Records"])
entries=[Entry(k, v, context).entry
for k, v in groups]
if debug:
print ("--- entries ---")
print (json.dumps(entries))
if entries!=[]:
nbatches=math.ceil(len(entries)/batchsize)
for i in range(nbatches):
batch=entries[i*batchsize:(i+1)*batchsize]
events.put_events(Entries=batch)
Note that the Lambda function does some batching around DynamoDB pk
(hash key) and sk
(range key) fields, following DynamoDB single table design principles
Now this works very nicely, but it's 2023 and I feel like DynamoDB should be able to push to Eventbridge (which seems like a super- common use case) without the aid of a custom Lambda function
I am pretty sure this is what AWS Pipes are for, but as usual with AWS I am finding the docs almost impossible to grok, and can't be sure
I would normally give this to ChatGPT but of course it was trained back in 2021 and doesn't know about Pipes :/
Can someone suggest a way of replacing the inline Lambda function with Pipes, with particular focus on how I might be able to batch messages in Pipes along the same lines as the Lambda function?
TIA
EventBridge Pipes are just a way to pipe data from source to destination. As you require custom processing to batch items, you could continue using Lambda.
Pipes has an Enrichment feature which ultimately triggers a Lambda function to do your batching. If you utilize Pipes with Enrichment, it may simplify your Lambda logic, that you must only handle the enrichment and Pipes will handle the logic to push the events to EventBridge.