Search code examples
javaaws-sdkamazon-dynamodb-streamsamazon-kcl

When AWS KCL processRecords is failed, how to "mark" that the records should be reprocessed?


I'm working with AWS DynamoStream which his API is based on the AWS KCL.

In cases I received records which I failed to process and I want those records to be available later to allow reprocessing of them. For instance I'm trying to save them to a remote DB and I experience network issues sometime.

My questions are:

  1. Can I use the Checkpointer in some way to indicate I Didn't handled the records?
  2. Should I just avoid executing Checkpointer.checkpoint()? will it have any effect if I still use it in the next call of processRecords?
  3. Is there maybe any exception I may use for that purpose?

Solution

  • KCL does not provide this sort of built-in redrive mechanism - once processRecords returns (whether it threw an exception or returned successfully), it considers those records as processed and moves on, even if internally it failed.

    If you want to reprocess some records at a later point, you need to capture those records and store them somewhere else for reprocessing attempt later (with the obvious caveat that they won't be processed in order from the rest of the stream).

    The simplest solution for this is to have your record processor logic identify the failed records (before returning to KCL) and send them to an SQS queue. Then, the records aren't lost, and they're available for processing at your leisure (or by another process consuming the SQS queue, possibly with a DLQ mechanism for handling repeated failures / give-up scenarios).

    To answer your specific questions:

    1. Nope. Checkpointing just says "I've got this far, don't look at things before the checkpoint"
    2. Think of checkpointing like a global state. Once it's set, it encompasses everything that came before it. You also don't need to checkpoint every call to processRecords - you might do it every X seconds, or every Y records, etc.
    3. Not at KCL level - you could use a special exception type internally, and catch that at your outer level of processRecords just before you return to Kinesis. Or you could just catch all exceptions - it's up to you and how specific you want to be with your redrive logic.