I am using the Spring Cloud Stream Binder for Kinesis with KPL/KCL enabled. We would like to disable Cloudwatch metrics without having to manage the configuration of KPL and KCL ourselves (completely overriding the beans). We would like to use the same bean definition for the KinesisProducerConfiguration
and each of the KinesisClientLibConfiguration
besides the KinesisProducerConfiguration.setMetricsLevel()
and KinesisClientLibConfiguration.withMetricsLevel(...)
properties.
For reference, here is where the AWS beans are defined in the Spring Cloud Stream Kinesis Binder: KinesisBinderConfiguration.java
What would be the most effective way to do this?
Any help is appreciated! Thanks.
The framework does not provide any of the KinesisClientLibConfiguration
. It is your project responsibility to expose such a bean and with whatever options you need: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
Starting with version 2.0.1, beans of
KinesisClientLibConfiguration
type can be provided in the application context to have a full control over Kinesis Client Library configuration options.
The producer side indeed is covered by the KinesisProducerConfiguration
bean in the KinesisBinderConfiguration
:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
kinesisProducerConfiguration.setRegion(this.region);
return kinesisProducerConfiguration;
}
I don't see a big problem from here to declare such a bean in your own configuration with any additional properties you'd like to have, including the mentioned metrics.
If this still is not OK for you, you can do something like this bean injection into your own bean and mutate it whatever way you want:
@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration) {
kinesisProducerConfiguration.setMetricsLevel();
return null;
}
UPDATE
The consumer part:
This is a bean based on default config instance for KCL we create internally:
@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
return new KinesisClientLibConfiguration(this.consumerGroup,
this.stream,
null,
null,
this.streamInitialSequence,
this.kinesisProxyCredentialsProvider,
null,
null,
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
this.workerId,
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
this.idleBetweenPolls,
false,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
this.consumerBackoff,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
new SimpleRecordsFetcherFactory(),
DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
Whatever you see with this.
has to be replaced with respective value from your env. Probably that KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE
is what you are looking for in this case.
The this.consumerGroup
and this.stream
must be same as in binding you want to configure consumer for.