Search code examples
javaspring-bootspring-cloud-streamamazon-kinesisamazon-kinesis-kpl

How to disable CloudWatch metrics for KPL/KCL with Spring Cloud Stream


I am using the Spring Cloud Stream Binder for Kinesis with KPL/KCL enabled. We would like to disable Cloudwatch metrics without having to manage the configuration of KPL and KCL ourselves (completely overriding the beans). We would like to use the same bean definition for the KinesisProducerConfiguration and each of the KinesisClientLibConfiguration besides the KinesisProducerConfiguration.setMetricsLevel() and KinesisClientLibConfiguration.withMetricsLevel(...) properties.

For reference, here is where the AWS beans are defined in the Spring Cloud Stream Kinesis Binder: KinesisBinderConfiguration.java

What would be the most effective way to do this?

Any help is appreciated! Thanks.


Solution

  • The framework does not provide any of the KinesisClientLibConfiguration. It is your project responsibility to expose such a bean and with whatever options you need: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties

    Starting with version 2.0.1, beans of KinesisClientLibConfiguration type can be provided in the application context to have a full control over Kinesis Client Library configuration options.

    The producer side indeed is covered by the KinesisProducerConfiguration bean in the KinesisBinderConfiguration:

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
    public KinesisProducerConfiguration kinesisProducerConfiguration() {
        KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
        kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
        kinesisProducerConfiguration.setRegion(this.region);
        return kinesisProducerConfiguration;
    }
    

    I don't see a big problem from here to declare such a bean in your own configuration with any additional properties you'd like to have, including the mentioned metrics.

    If this still is not OK for you, you can do something like this bean injection into your own bean and mutate it whatever way you want:

    @Bean
    String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration)  {
       kinesisProducerConfiguration.setMetricsLevel();
       return null;
    }
    

    UPDATE

    The consumer part:

    This is a bean based on default config instance for KCL we create internally:

    @Bean
    KinesisClientLibConfiguration kinesisClientLibConfiguration() {
        return new KinesisClientLibConfiguration(this.consumerGroup,
                                this.stream,
                                null,
                                null,
                                this.streamInitialSequence,
                                this.kinesisProxyCredentialsProvider,
                                null,
                                null,
                                KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
                                this.workerId,
                                KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
                                this.idleBetweenPolls,
                                false,
                                KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
                                KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
                                KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
                                new ClientConfiguration(),
                                new ClientConfiguration(),
                                new ClientConfiguration(),
                                this.consumerBackoff,
                                KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
                                KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
                                KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
                                null,
                                KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
                                KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
                                new SimpleRecordsFetcherFactory(),
                                DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
                                DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
                                DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
    }
    

    Whatever you see with this. has to be replaced with respective value from your env. Probably that KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE is what you are looking for in this case.

    The this.consumerGroup and this.stream must be same as in binding you want to configure consumer for.