Search code examples
javashardingamazon-kinesisamazon-kclamazon-kinesis-agent

Consume records from specific shards under KCL 2.x ( Kinesis )


I have set of records under some specific shards in the Kinesis stream. I m using KCL 2.x consumer to consume records from kinesis, but the issue is that consumer is fetching me records from all the shards available in the stream. So is there any way i can specify shards or their ID's while configuring the configBuilder object or KCL consumer, so that only records from specified shards are consumed.

Sample Code :

configsBuilder = new ConfigsBuilder(
        applicationName,
        streamName,
        kinesisAsyncClient,
        dynamoDbClient,
        cloudWatchClient,
        workerID,
        new RecordProcessorFactory());

scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configBuilder.retrievalConfig()
    );

    // start the kinesis records consumer.
    schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

Thanks in advance!


Solution

  • KCL 2.x provides a ShardPrioritization interface which allows to prioritize or filter the shards:

    /**
     * Provides logic to prioritize or filter shards before their execution.
     */
    public interface ShardPrioritization {
    
        /**
         * Returns new list of shards ordered based on their priority.
         * Resulted list may have fewer shards compared to original list
         * 
         * @param original
         *            list of shards needed to be prioritized
         * @return new list that contains only shards that should be processed
         */
        List<ShardInfo> prioritize(List<ShardInfo> original);
    }
    

    That's said, you can provide the ShardPrioritization implementation which will leave only shards relevant for you.

    After that, just specify your prioritizer in coordinator config:

    configsBuilder.coordinatorConfig
              .shardPrioritization(new CustomShardsPrioritixation())