I am trying to write a Kinesis consumer with Spring v3.4.2, spring-cloud-stream v4.2.0, spring-cloud-stream-binder-kinesis v4.0.4. Basically, this code should consume a Json from the Kinesis stream, map it into the designed object and then do other things. But unfortunately Spring cannot find my Kinesis Stream.
My application.properties contains:
spring.cloud.stream.bindings.input.destination=KinesisStreamName
My code looks like this:
@Configuration
public class KinesisConsumer {
@Bean
public Consumer<String> input(ObjectMapper mapper) {
return event -> {
try {
// Convert JSON string to Event object
Event obj = mapper.readValue(event, Event.class);
// other instructions with obj
} catch (Exception e) {
LOGGER.error("Error");
}
};
}
}
I've also tried this, as shown in this example:
@Configuration
public class KinesisConsumer {
@Bean
public Consumer<Event> input() {
return event -> {
try {
LOGGER.info("Received: {}", event);
// other instructions
} catch (Exception e) {
LOGGER.error("Error");
}
};
}
}
Here is what I get:
[o.s.c.s.b.k.p.KinesisStreamProvisioner,provisionKinesisConsumerDestination(),124] - Using Kinesis stream for inbound: input-in-0
[o.s.c.s.b.BindingService,rescheduleConsumerBinding(),212] - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: The stream [input-in-0] was not found and auto creation is disabled.
at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.createOrUpdate(KinesisStreamProvisioner.java:139)
at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionKinesisConsumerDestination(KinesisStreamProvisioner.java:127)
at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionConsumerDestination(KinesisStreamProvisioner.java:110)
at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionConsumerDestination(KinesisStreamProvisioner.java:57)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:529)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:103)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144)
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:192)
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:145)
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:99)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(Unknown Source)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
at com.enel.virtualentity.VirtualEntityApplication.main(VirtualEntityApplication.java:16)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64)
at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40)
Caused by: software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException: Stream input-in-0 under account xxxxxxxxxxxx **(AWS account shown here is correct)** not found. (Service: Kinesis, Status Code: 400, Request ID: f5995fda-1543-083d-9585-60f285f9aae7, Extended Request ID: 9nN/FFBVi8ZFec1tsKLGHELha3AcVLTsNXkJPS+XdIbUZWsASyykPWRgCEBMy7WW4t803144naL3NcMTJyjcjumb0q0CpP4C)
at software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:137)
at software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:91)
at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.unmarshall(AwsJsonProtocolErrorUnmarshaller.java:95)
at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:71)
at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:42)
at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52)
at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:102)
at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:95)
at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52)
at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:92)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:135)
at software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher$BytesReadTracker.onComplete(BytesReadTrackingPublisher.java:74)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$DataCountingPublisher$1.onComplete(ResponseHandler.java:511)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:246)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:76)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:367)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.publishMessage(HandlerPublisher.java:402)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.flushBuffer(HandlerPublisher.java:338)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.receivedDemand(HandlerPublisher.java:291)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.access$200(HandlerPublisher.java:61)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:495)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Unknown Source)
Spring is expecting a stream name like input-in-0, as if nothing is set in the application.properties file. What am I doing wrong?
Thanks
Your config is:
spring.cloud.stream.bindings.input.destination=KinesisStreamName
However the binding name is exactly what you got in the error: input-in-0
.
So, if you change your config to this:
spring.cloud.stream.bindings.input-in-0.destination=KinesisStreamName
you should be fine.
See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html