Search code examples
javaapache-flinkflink-streaming

Flink savepoint is declined


I'm trying to use save point on a job that I have implemented a customized parallelizable socket source. The source looks something similar to this

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
    int idx = getRuntimeContext().getIndexOfThisSubtask();
    String[] hosts = (config.hostsStr).split(":");
    String[] portStrArr = (config.portsStr).split(":");
    int[] ports = new int[portStrArr.length];
    for (int i = 0; i < portStrArr.length; i++) {
        ports[i] = Integer.parseInt(portStrArr[i]);
    }
    Socket s = new Socket(hosts[idx], ports[idx]);
    BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
    //ois = new ObjectInputStream(s.getInputStream());
    while (running) {
        String str  = in.readLine();
        sourceContext.collect(str);
    }
    sourceContext.close();
}
@Override
public void cancel() {
    running = false;
}

The exception on the cluster looks something like this

flink-1.1.3/bin//flink cancel -s hdfs://flink-master:19000/flink-checkpoints a18499a80099045eb5120ecacdabd421
Retrieving JobManager.
Using address flink-master/10.0.0.16:6123 to connect to JobManager.
Cancelling job a18499a80099045eb5120ecacdabd421 with savepoint to hdfs://flink-master:19000/flink-checkpoints.

java.lang.Exception: Canceling the job with ID a18499a80099045eb5120ecacdabd421 failed.
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.Exception: Failed to trigger savepoint.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629)
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
    at akka.dispatch.OnComplete.internal(Future.scala:247)
    at akka.dispatch.OnComplete.internal(Future.scala:245)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
Suppressed: java.lang.IllegalArgumentException: Self-suppression not permitted
    at java.lang.Throwable.addSuppressed(Throwable.java:1043)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:207)
    at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:150)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:281)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:888)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:813)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[CIRCULAR REFERENCE:java.io.EOFException: Premature EOF: no length prefix available]

On my local machine the save point is rejected by the following exception:

Cancelling job 4c99e0220c8c4683d1287269073b5c2c with savepoint to savepoints/.
java.lang.Exception: Canceling the job with ID 4c99e0220c8c4683d1287269073b5c2c failed.
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: java.lang.Exception: Failed to trigger savepoint.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629)
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
    at akka.dispatch.OnComplete.internal(Future.scala:247)
    at akka.dispatch.OnComplete.internal(Future.scala:245)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Checkpoint was declined (tasks not ready)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortDeclined(PendingCheckpoint.java:510)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:735)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply$mcV$sp(JobManager.scala:1491)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    ... 6 more

Is it because my source cannot be stopped properly so that the checkpoint would not happen? On the cluster it does say it is successful and return the location to the save point but there is no file on that path.


Solution

  • Given the source function excerpt it almost looks good to me. What you should do is to output elements under the checkpoint lock. Otherwise you might run into problems when an element is output at the same time as a checkpoint is triggered. The SourceContext#getCheckpointLock makes sure that these two operations don't happen concurrently.

    The first error looks a little bit as if you have a problem on the HDFS side. Could you check the logs whether they contain something suspicious? Maybe the data nodes ran out of disk space.

    The second exception indicates that something went wrong while doing the checkpoint. The JobManager logs should contain a log statement saying why the checkpoint has failed. It should have the format: Discarding checkpoint CHECKPOINT_ID because of checkpoint decline from task EXECUTION_ID : REASON.