Search code examples
pythonjsonaws-lambdaamazon-rdsamazon-kinesis

RDS MySQL to Kinesis data stream pipeline using AWS Lambda


I am trying to extract data from RDS MySQL instance and load into kinesis data stream using put_record boto3 API. The connection using pymysql is working and I am able to print the table but, I cannot write the data into kinesis data stream. I get this error "Object of type datetime is not JSON serializable".

def lambda_handler(event, context):
    
    connection = pymysql.connect(
    host = endpoint, 
    user = username,
    password = passwrd,
    database = database_name)
    
    cursor = connection.cursor()
    cursor.execute('SELECT * FROM table LIMIT 10')

    rows = cursor.fetchall()

    for row in rows:
        print("{0} {1} {2}".format(row[0], row[1], row[2]))

    kinesis = boto3.client('kinesis')

    response = kinesis.put_record(
        StreamName="test",
        Data=json.dumps(rows),
        PartitionKey="1"
    )
    
    connection.commit()
    
lambda_handler(None,None)

I tried printing the table and it worked. Only issue is putting records into kinesis data stream.


Solution

  • This error is thrown when an object with a property of type datetime is passed to json.dumps method. There are several ways on how to resolve it but the quickest one is to convert all datetime properties to string by using the keyword argument of json.dumps

    import json
    from datetime import datetime
    
    # The default keyword argument can 
    # be set to a function that gets called 
    # for objects that can't otherwise be serialized.
        response = kinesis.put_record(
            StreamName="test",
            Data=json.dumps(rows, default=str),
            PartitionKey="1"
        )