Search code examples
pythonpostgresqlaws-lambdaamazon-kinesisamazon-connect

Why Kinesis stream calls my Lambda function more than one time?


I am consuming Amazon Connect CTRs through Amazon Kinesis and inserting my data into Postgres. I am facing very unexpected behavior from Kinesis and Lambda function. Whenever a CTR record comes through kinesis, my lambda gets invoked and after inserting that record into Postgres, it again gets invoked and is very unexpected behavior. Although, I have received only one record. Here is my code, if anything is wrong with the code please correct me:

def lambda_handler(event, context):
print(event['Records'])
print(event)
for record in event['Records']:
    conn = psycopg2.connect(
    host = hostt,
    user = username,
    password = passwordd,
    database = databasee
    )
    cur = conn.cursor(cursor_factory = RealDictCursor)
    payload = base64.b64decode(record['kinesis']['data'])
    de_serialize_payload = json.loads(payload)
    print(len(de_serialize_payload))
    print(de_serialize_payload)
    try:
        for dsp in de_serialize_payload:
            if de_serialize_payload['Agent'] != None and de_serialize_payload['CustomerEndpoint'] != None and de_serialize_payload['Recording'] != None and de_serialize_payload['TransferredToEndpoint'] != None:
                required_data = {
                    'arn' : de_serialize_payload['Agent']['ARN'],
                    'aftercontactworkduration' : de_serialize_payload['Agent']['AfterContactWorkDuration'],
                    'aftercontactworkendtimestamp' : de_serialize_payload['Agent']['AfterContactWorkEndTimestamp'],
                    'aftercontactworkstarttimestamp' : de_serialize_payload['Agent']['AfterContactWorkStartTimestamp'],
                    'agentconnectionattempts' : de_serialize_payload['AgentConnectionAttempts'],
                    'agentinteractionduration' : de_serialize_payload['Agent']['AgentInteractionDuration'],
                    'answeringmachinedetectionstatus' : de_serialize_payload['AnsweringMachineDetectionStatus'],
                    'channel' : de_serialize_payload['Channel'],
                    'connectedtoagenttimestamp' : de_serialize_payload['Agent']['ConnectedToAgentTimestamp'],
                    'connectedtosystemtimestamp' : de_serialize_payload['ConnectedToSystemTimestamp'],
                    'customerendpointaddress' : de_serialize_payload['CustomerEndpoint']['Address'],
                    'customerendpointtype' : de_serialize_payload['CustomerEndpoint']['Type'],
                    'customerholdduration' : de_serialize_payload['Agent']['CustomerHoldDuration'],
                    'dequeuetimestamp' : de_serialize_payload['Queue']['DequeueTimestamp'],
                    'disconnectreason' : de_serialize_payload['DisconnectReason'],
                    'disconnecttimestamp' : de_serialize_payload['DisconnectTimestamp'],
                    'queueduration' : de_serialize_payload['Queue']['Duration'],
                    'enqueuetimestamp' : de_serialize_payload['Queue']['EnqueueTimestamp'],
                    'hierarchygroups' : de_serialize_payload['Agent']['HierarchyGroups'],
                    'initialcontactid' : de_serialize_payload['InitialContactId'],
                    'initiationmethod' : de_serialize_payload['InitiationMethod'],
                    'initiationtimestamp' : de_serialize_payload['InitiationTimestamp'],
                    'instancearn' : de_serialize_payload['InstanceARN'],
                    'lastupdatetimestamp' : de_serialize_payload['LastUpdateTimestamp'],
                    'longestholdduration' : de_serialize_payload['Agent']['LongestHoldDuration'],
                    'nextcontactid' : de_serialize_payload['NextContactId'],
                    'numberofholds' : de_serialize_payload['Agent']['NumberOfHolds'],
                    'previouscontactid': de_serialize_payload['PreviousContactId'],
                    'queuearn' : de_serialize_payload['Queue']['ARN'],
                    'queuename' : de_serialize_payload['Queue']['Name'],
                    'recordingdeletionreason' : de_serialize_payload['Recording']['DeletionReason'],
                    'recordinglocation' : de_serialize_payload['Recording']['Location'],
                    'recordingstatus' : de_serialize_payload['Recording']['Status'],
                    'recordingtype' : de_serialize_payload['Recording']['Type'],
                    'routingprofilearn' : de_serialize_payload['Agent']['RoutingProfile']['ARN'],
                    'routingprofilename' : de_serialize_payload['Agent']['RoutingProfile']['Name'],
                    'scheduledtimestamp' : de_serialize_payload['ScheduledTimestamp'],
                    'systemendpointaddress' : de_serialize_payload['SystemEndpoint']['Address'],
                    'systemendpointtype' : de_serialize_payload['SystemEndpoint']['Type'],
                    'transfercompletedtimestamp' : de_serialize_payload['TransferCompletedTimestamp'],
                    'transferredtoendpoint' : de_serialize_payload['TransferredToEndpoint']['Address'],
                    'username' : de_serialize_payload['Agent']['Username'],
                    'voiceidresult' : de_serialize_payload['VoiceIdResult'],
                    'id' : de_serialize_payload['ContactId']
                    } 
                columns = required_data.keys()
                print(columns)
                values = [required_data[column] for column in columns]
                print(values)
                insert_statement = "insert into public.ctr (%s) values %s;"
                cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
                print(cur.mogrify(insert_statement, (AsIs(','.join(columns)), tuple(values))))
                conn.commit()
                count = cur.rowcount
                print(count, "Record inserted successfully into mobile table")
                print("Agent, customer endpoint, transfer endpoint and recording data is available")

After one successful iteration it again starts iterating. I have spent more than two days on it and didn't figure out what's the problem.

I would really appreciate if someone guides me and sort out this query.


Solution

  • The issue was in my code. I was not ending my function successfully. It is Kinesis behavior if you are not ending your function successfully (200 OK) then kinesis reinvokes your function several times. So it it necessary to end your function properly.