Search code examples
amazon-web-servicesamazon-kinesisamazon-kclamazon-kinesis-kpl

AWS Kinesis KCL skips records added before startup


I started to use both KPL and KCL to exchange data between services. But whenever consumer service is offline, all data sent by KPL are lost forever. So I get only those chunks of data that were sent while consumer service is up and its shardConsumer is ready. I need to start from the last consumed point or somehow else process data left behind.

Here is my ShardProcessor code:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

And configuration code:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

Solution

  • There are two ways to address this. First, the problem.

    By default, the KCL is configured to start reading the stream at LATEST. This setting tells the stream reader to pick up the stream at the "current" timestamp.

    In your case, you have data in that stream that was placed in there before "now." In order to read that data, you might want to consider reading the earliest data you have in the stream. If you set up a default stream, the stream will store data for 24 hours.

    To read the data from the "beginning" of that stream, or 24 hours before you start the KCL application, you'll want to set the stream reader to TRIM_HORIZON. This setting is called initialPositionInStream. You can read about it here. There are three different settings documented in the API.

    To solve your issue, the preferred method, as noted in the first link, is to add an entry to the properties file. If you're not using a properties file, you can simply add this to your Scheduler ctor:

    Scheduler scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configsBuilder.retrievalConfig()
            .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
            .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
    );
    

    One thing to keep in mind with this setting is startup functionality when you have data in the stream and you start at TRIM_HORIZON. In this scenario, the RecordProcessor will iterate through records as fast as it can. This could create performance issues at the Kinesis API, or even downstream systems (wherever you're sending the data once the RecordProcessor has it),