I started to use both KPL
and KCL
to exchange data between services. But whenever consumer service
is offline, all data sent by KPL
are lost forever. So I get only those chunks of data that were sent while consumer service
is up and its shardConsumer
is ready. I need to start from the last consumed point or somehow else process data left behind.
Here is my ShardProcessor
code:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
And configuration code:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
There are two ways to address this. First, the problem.
By default, the KCL is configured to start reading the stream at LATEST
. This setting tells the stream reader to pick up the stream at the "current" timestamp.
In your case, you have data in that stream that was placed in there before "now." In order to read that data, you might want to consider reading the earliest data you have in the stream. If you set up a default stream, the stream will store data for 24 hours.
To read the data from the "beginning" of that stream, or 24 hours before you start the KCL application, you'll want to set the stream reader to TRIM_HORIZON
. This setting is called initialPositionInStream
. You can read about it here. There are three different settings documented in the API.
To solve your issue, the preferred method, as noted in the first link, is to add an entry to the properties file. If you're not using a properties file, you can simply add this to your Scheduler
ctor:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
One thing to keep in mind with this setting is startup functionality when you have data in the stream and you start at TRIM_HORIZON
. In this scenario, the RecordProcessor
will iterate through records as fast as it can. This could create performance issues at the Kinesis API, or even downstream systems (wherever you're sending the data once the RecordProcessor has it),