I am using Spring Cloud Stream Kinesis binder (version 2.1.0)
Because of security reasons, I must have one set of credentials for Kinesis and another set of credentials for DynamoDB and CloudWatch.
Everything works fine if spring.cloud.stream.kinesis.binder.kplKclEnabled
is set to false. But if it is set to true I have the exception
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found
Whole stack trace available at https://pastebin.com/bjvKSzrg
I would like to have KCL enabled so does anybody know how to avoid this error?
I know that error happens because user credentials for cloudwatch and dynamodb don't "see" mentioned Kinesis stream. But why is there a need from them to see it? Also, if KCL is disabled it works as expected. so don't see why it wouldn't work with enabled KCL
Here is my properties file
spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer
spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration
cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
Mentioned configuration class
@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
AwsProperties.Properties properties;
public KinesisOutputConfiguration(AwsProperties awsProperties) {
this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
}
@Bean(destroyMethod = "shutdown")
public AmazonKinesisAsync amazonKinesis() {
RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
new RestTemplate());
return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
}
@Bean(destroyMethod = "shutdown")
public AmazonCloudWatchAsync cloudWatch() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
@Bean(destroyMethod = "shutdown")
@Primary
public AmazonDynamoDBAsync dynamoDBAsync() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
}
Your configuration is correct: if you need to use different credentials for those services, you definitely need to declare custom beans for them. The DynamoDB and CloudWatch are required services for Kinesis Client Library. It is used from one hand to manage an offset from stream shards, and on the other - to handle consumer instances changes in the cluster for shards exclusive access. So, it's indeed the fact that Kinesis resource must be available for DynamoDB and CloudWatch users.
See more info in Kinesis Client Library or ask AWS support: nothing Kinesis Binder can do for you on the matter...
https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html