Search code examples
pythonamazon-web-servicesboto3amazon-kinesis

Kinesis consumer returning empty record (boto, python)


I am having trouble checking the data I am writing to Kinesis. It seems like the following example should work, but I am getting an empty list returned from get_records (in the Records field). Any ideas what could be going on?

import uuid
import boto3
import time


streamname = 'mytestStream'
session = boto3.session.Session() 
kinesis_client = session.client('kinesis', region_name='us-east-1')


##### WRITE TO KINESIS

partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)

time.sleep(5)


##### READ FROM KINESIS

shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)

Thanks!


Solution

  • If you'll use LATEST checkpoint, you should first start reading the stream, then place the record. In your example the timeline is as follows;

    • at t0: The latest checkpoint in stream is at 101.
    • at t1 (main thread): You put record to the stream, and the record is at checkpoint 102.
    • at t2 (main thread): You start tailing the stream at the LATEST point which is 103.

    To fix this, you should run the producer and the consumer in different threads. The correct flow should be like this;

    • at t0 (consumer thread): Start tailing the steam at LATEST position, which is 201.
    • at t1 (producer thread): You put record to the stream, and the record is placed on checkpoint 202.
    • at t2 (consumer thread): As the shard on server side moves forward (because you just added data) and you have been tailin the shard since checkpoint 201, you iterate the new checkpoint 202 and your data is displayed.