Search code examples
amazon-dynamodbaws-event-bridge

Streaming DynamoDB events to Eventbridge using Pipes


I have a Cloudformation template in which DynamoDB events are pushed to Eventbridge via DynamoDB streams and an inline Lambda function

The Cloudformation table stuff looks like this -

    "MyTable": {
      "Type": "AWS::DynamoDB::Table",
      "Properties": {
        "AttributeDefinitions": [
          {
            "AttributeName": "pk",
            "AttributeType": "S"
          },
          {
            "AttributeName": "sk",
            "AttributeType": "S"
          }
        ],
        "BillingMode": "PAY_PER_REQUEST",
        "KeySchema": [
          {
            "AttributeName": "pk",
            "KeyType": "HASH"
          },
          {
            "AttributeName": "sk",
            "KeyType": "RANGE"
          }
        ],
        "GlobalSecondaryIndexes": [],
        "StreamSpecification": {
          "StreamViewType": "NEW_AND_OLD_IMAGES"
        }
      }
    },
    "MyTableMapping": {
      "Type": "AWS::Lambda::EventSourceMapping",
      "Properties": {
        "FunctionName": {
          "Ref": "MyTableStreamingFunction"
        },
        "StartingPosition": "LATEST",
        "MaximumBatchingWindowInSeconds": 1,
        "EventSourceArn": {
          "Fn::GetAtt": [
            "MyTable",
            "StreamArn"
          ]
        },
        "MaximumRetryAttempts": 3
      }
    }

And the Lambda code for MyTableStreamingFunction looks like this -

import boto3, json, math, os

class Key:

    def __init__(self, pk, sk, eventname, diffkeys):
        self.pk=pk
        self.sk=sk
        self.eventname=eventname
        self.diffkeys=diffkeys

    def __str__(self):
        return "%s/%s/%s/%s" % (self.pk,
                                self.sk,
                                self.eventname,
                                "|".join(self.diffkeys))

class Entry:

    def __init__(self, key, records, context):
        self.key=key
        self.records=records
        self.context=context

    @property
    def entry(self):        
        detail={"pk": self.key.pk,
                "sk": self.key.sk,
                "eventName": self.key.eventname,
                "diffKeys": self.key.diffkeys,
                "records": self.records}
        source=self.context.function_name
        detailtype=self.key.eventname
        return {"Source": source,
                "DetailType": detailtype,
                "Detail": json.dumps(detail)}

def batch_records(records):
    def diff_keys(record):
        if not ("NewImage" in record["dynamodb"] and
                "OldImage" in record["dynamodb"]):
            return []        
        newimage={k: list(v.values())[0]
                  for k, v in record["dynamodb"]["NewImage"].items()}
        oldimage={k: list(v.values())[0]
                  for k, v in record["dynamodb"]["OldImage"].items()}
        diffkeys=[]
        for k in newimage:
            if (k not in oldimage or
                newimage[k]!=oldimage[k]):
                diffkeys.append(k)
        return sorted(diffkeys) # NB sort
    keys, groups = {}, {}
    for record in records:
        pk=record["dynamodb"]["Keys"]["pk"]["S"]
        sk=record["dynamodb"]["Keys"]["sk"]["S"].split("#")[0]
        eventname=record["eventName"]
        diffkeys=diff_keys(record)
        key=Key(pk=pk,
                sk=sk,
                eventname=eventname,
                diffkeys=diffkeys)
        strkey=str(key)
        if strkey not in keys:
            keys[strkey]=key
        groups.setdefault(strkey, [])
        groups[strkey].append(record)
    return [(key, groups[strkey])
            for strkey, key in keys.items()]

def handler(event, context,
            batchsize=os.environ["BATCH_SIZE"],
            debug=os.environ["DEBUG"]):
    batchsize=int(batchsize)
    debug=eval(debug.lower().capitalize())
    events=boto3.client("events")
    if debug:
        print ("--- records ---")
        print (json.dumps(event["Records"]))
    groups=batch_records(event["Records"])
    entries=[Entry(k, v, context).entry
             for k, v in groups]
    if debug:
        print ("--- entries ---")
        print (json.dumps(entries))
    if entries!=[]:
        nbatches=math.ceil(len(entries)/batchsize)
        for i in range(nbatches):
            batch=entries[i*batchsize:(i+1)*batchsize]
            events.put_events(Entries=batch)

Note that the Lambda function does some batching around DynamoDB pk (hash key) and sk (range key) fields, following DynamoDB single table design principles

Now this works very nicely, but it's 2023 and I feel like DynamoDB should be able to push to Eventbridge (which seems like a super- common use case) without the aid of a custom Lambda function

I am pretty sure this is what AWS Pipes are for, but as usual with AWS I am finding the docs almost impossible to grok, and can't be sure

I would normally give this to ChatGPT but of course it was trained back in 2021 and doesn't know about Pipes :/

Can someone suggest a way of replacing the inline Lambda function with Pipes, with particular focus on how I might be able to batch messages in Pipes along the same lines as the Lambda function?

TIA


Solution

  • EventBridge Pipes are just a way to pipe data from source to destination. As you require custom processing to batch items, you could continue using Lambda.

    Pipes has an Enrichment feature which ultimately triggers a Lambda function to do your batching. If you utilize Pipes with Enrichment, it may simplify your Lambda logic, that you must only handle the enrichment and Pipes will handle the logic to push the events to EventBridge.