I have an application that consumes records from Kinesis streams and processes them further but the performance is quite low, So now I am planning to migrate to the Kinesis Enhanced fan-out consumer using KCL 2.x to improve its performance. As the Aws Kinesis docs for the enhanced fan-out is quite confusing, can someone help me with an example of how I can implement this consumer feature in my Java application?
Here is a very detailed example of a KCL 2.x consumer: https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html
The most important parts are:
SampleRecordProcessor
- implementation of the ShardRecordProcessor interface where the consumer processing logic lives.SampleRecordProcessorFactory
Scheduler
config:Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
The important part above is that the default retrievalConfig() is specified, which configures Enhanced fan-out consumer under the hood. The explicit way is the following:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(appName)
)
.maxListShardsRetryAttempts(maxListShardsRetryAttempts)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
)
);