Search code examples
spring-integrationspring-cloud-streamspring-cloud-awsspring-integration-aws

Spring Aws Kinesis Binder Acquiring and Releasing lock issues in Dynamo DB while consuming messages


Sometimes, when we are stopping the application abruptly exception occurs that unlocking has been failed. Then the same group will never gets the messages. Other groups are getting messages.

I am using the aws kinesis binder snapshot version.

This is the error, when application is stopped.

    2018-07-19 22:21:21.371 ERROR 60981 --- [s-shard-locks-1] a.i.k.KinesisMessageDrivenChannelAdapter : Error during unlocking: DynamoDbLock [lockKey=aaaa:myStream:shardId-000000000000,lockedAt=2018-07-19@22:21:11.145, lockItem=null]

    org.springframework.dao.DataAccessResourceFailureException: Failed to release lock at aaaa:myStream:shardId-000000000000; nested exception is java.util.concurrent.RejectedExecutionException: Task org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock$$Lambda$421/685035750@7835c79e rejected from java.util.concurrent.ThreadPoolExecutor@650635c5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
        at org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock.unlock(DynamoDbLockRegistry.java:538) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumerManager.run(KinesisMessageDrivenChannelAdapter.java:1250) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_172]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_172]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172]
    Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock$$Lambda$421/685035750@7835c79e rejected from java.util.concurrent.ThreadPoolExecutor@650635c5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) [na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) [na:1.8.0_172]
        at org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock.unlock(DynamoDbLockRegistry.java:529) ~[spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        ... 6 common frames omitted

Then we start the application, this is the error we get.

    2018-07-19 22:15:25.912  INFO 60969 --- [       Thread-3] ConfigServletWebServerApplicationContext : Closing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@62150f9e: startup date [Thu Jul 19 22:14:26 IST 2018]; root of context hierarchy
    2018-07-19 22:15:25.914  INFO 60969 --- [       Thread-3] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@29b40b3: startup date [Thu Jul 19 22:14:33 IST 2018]; parent: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@62150f9e
    2018-07-19 22:15:25.924  INFO 60969 --- [b-lock-client-1] c.a.s.d.AmazonDynamoDBLockClient         : Heartbeat thread recieved interrupt, exiting run() (possibly exiting thread)

    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) [na:1.8.0_172]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient.run(AmazonDynamoDBLockClient.java:955) ~[dynamodb-lock-client-1.0.0.jar:na]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_172]

    2018-07-19 22:15:25.928  INFO 60969 --- [       Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147482647
    2018-07-19 22:15:25.929  INFO 60969 --- [       Thread-3] a.i.k.KinesisMessageDrivenChannelAdapter : stopped KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='myStream', shard='shardId-000000000000', reset=false}], consumerGroup='aaaa'}
    2018-07-19 22:15:25.930  INFO 60969 --- [       Thread-3] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.myStream.aaaa.errors' has 1 subscriber(s).
    2018-07-19 22:15:25.931  INFO 60969 --- [       Thread-3] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.myStream.aaaa.errors' has 0 subscriber(s).
    2018-07-19 22:15:25.931  WARN 60969 --- [s-shard-locks-1] c.a.s.d.AmazonDynamoDBLockClient         : Could not acquire lock because of a client side failure in talking to DDB

    com.amazonaws.AbortedException: 
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:795) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:3452) ~[aws-java-sdk-dynamodb-1.11.336.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3428) ~[aws-java-sdk-dynamodb-1.11.336.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeGetItem(AmazonDynamoDBClient.java:1789) ~[aws-java-sdk-dynamodb-1.11.336.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.getItem(AmazonDynamoDBClient.java:1764) ~[aws-java-sdk-dynamodb-1.11.336.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient.readFromDynamoDB(AmazonDynamoDBLockClient.java:997) [dynamodb-lock-client-1.0.0.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient.getLockFromDynamoDB(AmazonDynamoDBLockClient.java:743) [dynamodb-lock-client-1.0.0.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient.acquireLock(AmazonDynamoDBLockClient.java:402) [dynamodb-lock-client-1.0.0.jar:na]
        at com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient.tryAcquireLock(AmazonDynamoDBLockClient.java:567) [dynamodb-lock-client-1.0.0.jar:na]
        at org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock.doLock(DynamoDbLockRegistry.java:504) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock.tryLock(DynamoDbLockRegistry.java:478) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.integration.aws.lock.DynamoDbLockRegistry$DynamoDbLock.tryLock(DynamoDbLockRegistry.java:451) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumerManager.lambda$run$0(KinesisMessageDrivenChannelAdapter.java:1198) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at java.util.Collection.removeIf(Collection.java:414) ~[na:1.8.0_172]
        at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumerManager.run(KinesisMessageDrivenChannelAdapter.java:1191) [spring-integration-aws-2.0.0.BUILD-SNAPSHOT.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_172]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_172]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_172]
    Caused by: java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) ~[na:1.8.0_172]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doPauseBeforeRetry(AmazonHttpClient.java:1671) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.pauseBeforeRetry(AmazonHttpClient.java:1645) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1191) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-core-1.11.336.jar:na]
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.336.jar:na]
        ... 22 common frames omitted

Solution

  • I would say that it is normal behavior for any distributed locks implementation: when your thread holding the lock is broken, there is no any hook to unlock on the DB level.

    You need to study a leaseDuration property of the AmazonDynamoDBLockClient: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/

    Also you show both stack traces in the case of application stop though...

    Any chances that you can make a simple project to let us to play with?

    UPDATE

    The problem has been fixed via: https://github.com/spring-projects/spring-integration-aws/commit/dd52e8f304d20bbdc46d3f9da77e26dd7977f8b7

    Thank you again for your good report!