Search code examples
javaaws-sdkapache-flink

FlinkKinesisConsumer does not retry on NoHttpResponseException?


(Apache Flink1.8 on AWS EMR release label 5.28.x)

Our data source is an AWS Kinesis stream (with 450 shards if that matters). We use the FlinkKinesisConsumer to read the kinesis stream. Our application occasionally (once every couple of days) crashes with a "Target server failed to respond" error. The full stack trace is at the bottom.

Looking more into the codebase I found out that 'ProvisionedThroughputExceededException' are the only exception types that are retried on. Code
1. Wondering why a transient http response exception is not retried by the kinesis connector?
2. Is there a way I can pass in a retry configuration that will retry on these errors?

As a side note, we set the following retry configuration -

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

Full stack trace of the exception -

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • KinesisProxy has the support to retry exceptions, and the retry behavior can be controlled with the settings mentioned in the previous answer. Not all exceptions will be retried however, and the default whitelist does not cover all the transient issues that can typically occur with the Kinesis service. We have customized the proxy as follows (over time) to arrive at a stable production setup:

      @Override
      protected boolean isRecoverableSdkClientException(SdkClientException ex) {
        if (ex instanceof KMSThrottlingException) {
          // not handled in KinesisProxy in 1.5.x
          return true;
        } else if (ex instanceof AmazonServiceException) {
          return KinesisProxy.isRecoverableException((AmazonServiceException)ex);
        } else if (ex.getCause() instanceof SocketTimeoutException) {
          return true;
        } else if (ex.getCause() instanceof NoHttpResponseException) {
          return true;
        } else if (ex.getCause() instanceof ConnectTimeoutException) {
          return true;
        } else if (ex.getCause() instanceof java.net.UnknownHostException) {
          return true;
        } else if (ex.getCause() instanceof javax.net.ssl.SSLHandshakeException) {
          return true;
        }
        return false;
      }