Search code examples
springspring-cloud-streamamazon-kinesisspring-integration-awsspring-cloud-stream-binder-kinesis

Kinesis Stream is not found under AWS Account by Spring Kinesis binder


I am trying to write a Kinesis consumer with Spring v3.4.2, spring-cloud-stream v4.2.0, spring-cloud-stream-binder-kinesis v4.0.4. Basically, this code should consume a Json from the Kinesis stream, map it into the designed object and then do other things. But unfortunately Spring cannot find my Kinesis Stream.

My application.properties contains: spring.cloud.stream.bindings.input.destination=KinesisStreamName

My code looks like this:

@Configuration
public class KinesisConsumer {

    @Bean
    public Consumer<String> input(ObjectMapper mapper) {
        return event -> {
            try {
                // Convert JSON string to Event object
                Event obj = mapper.readValue(event, Event.class);

                // other instructions with obj
            } catch (Exception e) {
                LOGGER.error("Error");
            }
        };
    }
}

I've also tried this, as shown in this example:

@Configuration
public class KinesisConsumer {

    @Bean
    public Consumer<Event> input() {
        return event -> {
            try {
                LOGGER.info("Received: {}", event);

                // other instructions
            } catch (Exception e) {
                LOGGER.error("Error");
            }
        };
    }
}

Here is what I get:

[o.s.c.s.b.k.p.KinesisStreamProvisioner,provisionKinesisConsumerDestination(),124] - Using Kinesis stream for inbound: input-in-0
[o.s.c.s.b.BindingService,rescheduleConsumerBinding(),212] - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: The stream [input-in-0] was not found and auto creation is disabled.
        at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.createOrUpdate(KinesisStreamProvisioner.java:139)
        at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionKinesisConsumerDestination(KinesisStreamProvisioner.java:127)
        at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionConsumerDestination(KinesisStreamProvisioner.java:110)
        at org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner.provisionConsumerDestination(KinesisStreamProvisioner.java:57)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:529)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:103)
        at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:144)
        at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:192)
        at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:145)
        at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:99)
        at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
        at java.base/java.util.LinkedHashMap$LinkedValues.forEach(Unknown Source)
        at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59)
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
        at java.base/java.lang.Iterable.forEach(Unknown Source)
        at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
        at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
        at com.enel.virtualentity.VirtualEntityApplication.main(VirtualEntityApplication.java:16)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:102)
        at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:64)
        at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40)
Caused by: software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException: Stream input-in-0 under account xxxxxxxxxxxx **(AWS account shown here is correct)** not found. (Service: Kinesis, Status Code: 400, Request ID: f5995fda-1543-083d-9585-60f285f9aae7, Extended Request ID: 9nN/FFBVi8ZFec1tsKLGHELha3AcVLTsNXkJPS+XdIbUZWsASyykPWRgCEBMy7WW4t803144naL3NcMTJyjcjumb0q0CpP4C)
        at software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:137)
        at software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException$BuilderImpl.build(ResourceNotFoundException.java:91)
        at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.unmarshall(AwsJsonProtocolErrorUnmarshaller.java:95)
        at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:71)
        at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonProtocolErrorUnmarshaller.handle(AwsJsonProtocolErrorUnmarshaller.java:42)
        at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52)
        at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:102)
        at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:95)
        at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52)
        at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:92)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:135)
        at software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher$BytesReadTracker.onComplete(BytesReadTrackingPublisher.java:74)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$DataCountingPublisher$1.onComplete(ResponseHandler.java:511)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:246)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:76)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:367)
        at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.publishMessage(HandlerPublisher.java:402)
        at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.flushBuffer(HandlerPublisher.java:338)
        at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.receivedDemand(HandlerPublisher.java:291)
        at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.access$200(HandlerPublisher.java:61)
        at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher$ChannelSubscription$1.run(HandlerPublisher.java:495)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Unknown Source)

Spring is expecting a stream name like input-in-0, as if nothing is set in the application.properties file. What am I doing wrong?

Thanks


Solution

  • Your config is:

    spring.cloud.stream.bindings.input.destination=KinesisStreamName
    

    However the binding name is exactly what you got in the error: input-in-0.

    So, if you change your config to this:

    spring.cloud.stream.bindings.input-in-0.destination=KinesisStreamName
    

    you should be fine.

    See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html