Search code examples
amazon-web-servicesboto3amazon-kinesis

How to correctly read and checkpoint a Kinesis stream with multiple shards with the Python boto3 client?


AWS Kinesis seems to be made for use with Lambdas, with the concept of shards tied in with bandwidth and latency - but I'm trying to read it from a non-Lambda service, a long-running process which would ultimately call get_records() in a loop.

But apparently there's some manual work needed to correctly read from all the shards, including handling resharding and checkpointing, which I think is under-documented. My question is: how to correctly read from all the shards in a Kinesis stream from a single long-running Python boto3 client and handle all the special steps?


Solution

  • AWS Kinesis seems to be made for use with Lambdas.

    Kinesis predates Lambda by a year.

    My question is: how to correctly read from all the shards in a Kinesis stream from a single long-running Python boto3 client and handle all the special steps?

    The brief outline is this:

    1. All Kinesis events had a property called SequenceNumber. It's a string that can be converted to a decimal number. It's guaranteed to be unique within a single shard but not across shards.
    2. The first time your application starts, you retrieve a list of all the shards that exist in your stream. These shards can be in parent-child relationships: some shards are products of a merge of two parent shards; others are products of a split of a single shard.
    3. Every shard has the start sequence number and may have the end sequence number.
    4. The order of messages is defined like that:
      1. Messages within a single shard are ordered by sequence number
      2. The messages in a descendant shard come later than the messages in all its ancestor (parent, grandparent and so on) shards
      3. If two shards are not in a transitive child-parent relationship, the order of messages between them is undefined

    Given all these constraints, your application should do this:

    1. Get a list of existent shards in your stream by calling ListShards.

    2. Read the records from the shards by calling GetShardIterator first and then GetRecords in a sequential loop. The result of every GetRecords is a batch of messages for the shard, the next iterator token, and, possible, the list of child shards, that is returned when the current shard is exhausted.

      1. For the first call to GetShardIterator, use AFTER_SEQUENCE_NUMBER and provide the checkpointed sequence number that you have last processed (see below).
      2. If you haven't processed anything yet in this shard (no checkpoint record), use AT_SEQUENCE_NUMBER with the start id returned by ListShards.

      GetRecords might return no records which is normal. If that's the case, you might want to introduce a delay before the next call.

    3. Process the messages returned by GetRecords. It's up to your application how to define the retry logic if some of the messages fail to process. Kinesis ideology assumes that message ordering is important, so you might process messages one by one, even if they are returned in a single batch. If a message cannot be processed after a number of retries, it might make sence to drop it and move on.

    4. Once a message is processed, you should checkpoint your position in the shard. This is a fancy way of saying that you should write down the shard id and the sequence number of the last processed messages somewhere. It can be a database, a file on the disk, a key-value store or something like that. If your applications dies, the next time it restarts it will only start processing the shard from the last checkpointed position. You might want to do the checkpoints more rarely that once per message, as long as you're fine with double processing in the rare case that your application dies and restarts.

    5. Once a shard is exhausted (which can only happen if it's been split or merged), meaning that you have processed its last record, you should record this fact in your checkpoint store, so that you can remove it from your processing list.

    6. It's up to your application how to parallelize load between the shards, as long as you mind the constraints. If you have two shards 1 and 2 that had been merged and produced shard 3, 1 and 2 can be processed in parallel, but 3 can only be processed after 1 and 2. This is the hardest part in this business. You might want to take some inspiration for implementing it from here.

    AWS has a product specifically designed for that, called KCL (Kinesis Client Library).

    Unfortunately, it has a hard dependency on DynamoDB as a checkpoint store and can only natively integrate with Java code (altough it has a variety to process messages by calling child processes written in any language and communicate with them over standards process streams).