Search code examples
amazon-web-servicesamazon-kinesisamazon-kcl

Timeout issues when using Amazon Kinesis Client Library resulting in dropped records


I am facing the following issue when running a KCL consumer against a LocalStack instance:

[INFO ] 2021-07-15 17:30:34.019 [software.amazon.kinesis.coordinator.DiagnosticEventLogger][task-1] DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
[WARN ] 2021-07-15 17:30:34.190 [software.amazon.kinesis.lifecycle.ShardConsumerSubscriber][ShardRecordProcessor-0000] ShardConsumerSubscriber - shardId-000000000000: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as necessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittent ReadTimeout warnings. Last successful request details -- request id - UNKNOWN, timestamp - 2021-07-15T17:30:20.005207Z
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:343) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$800(FanOutRecordsPublisher.java:68) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.executeExceptionOccurred(FanOutRecordsPublisher.java:802) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:778) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$80(DefaultKinesisAsyncClient.java:2682) ~[kinesis-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:54) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$notifyError$5(ResponseHandler.java:309) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$500(ResponseHandler.java:71) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.notifyError(ResponseHandler.java:307) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onError(ResponseHandler.java:283) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.exceptionCaught(HandlerPublisher.java:473) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler.exceptionCaught(Http2StreamExceptionHandler.java:53) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.UnusedChannelExceptionHandler.exceptionCaught(UnusedChannelExceptionHandler.java:52) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:504) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:476) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: null
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) ~[sdk-core-2.16.98.jar:?]
    ... 45 more
Caused by: io.netty.handler.timeout.ReadTimeoutException

The KinesisAsyncClient is created with:

KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder()          
.region(region);
        if (endpoint != null) {
            log.debug("AWS Endpoint for Kinesis Client is set to {}", endpoint);
            kinesisAsyncClientBuilder.endpointOverride(URI.create(endpoint));
        }
        return KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);

and passed into the Scheduler which is run on a separate Thread. When running integration tests I insert records one by one unto the LocalStack kinesis stream (with a 15 second delay between insertions) but only some of the record are actually processed. Sometimes 4/6 of the records are processed and sometimes 2/6.

I've tried using ClientOverrideConfiguration with increased timeouts as well as providing increased timeouts for the HttpClient but with no changes. Anyone faced this issue before? I saw a bunch of similar questions posted online but they all got one, official response and none of the solutions in the response worked.

consumer-service.log


Solution

  • Turns out this was much simpler than I thought. I failed to correctly set up the Scheduler.

        return new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
        );
    

    If you run into the same error I did make sure you have the last line added to the scheduler. I did not have this line when I was getting this error. Once I added the line the errors disappeared.

    .retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))