Search code examples
apache-flinkflink-streaming

Getting Exception while inspecting flink savepoint using state processor api


I am getting an Exception in thread "main" java.lang.IllegalAccessError: class org.apache.flink.state.api.runtime.SavepointLoader tried to access protected method org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String;)Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader and org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage are in unnamed module of loader 'app')

Using flink 1.8. Using below maven repo :

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-state-processor-api_2.12</artifactId>
      <version>1.9.1</version>
    </dependency>

Source code snippet

        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
        savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));

Getting exception on second line which calls below function

    public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
    org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
    ...
    ...
}

Which calls below function :

    package org.apache.flink.state.api.runtime;

    public static Savepoint loadSavepoint(String savepointPath) throws IOException {
        CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
            .resolveCheckpointPointer(savepointPath);

        try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
            return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
        }
    }

which calls below function :

    package org.apache.flink.runtime.state.filesystem;

    protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        checkNotNull(checkpointPointer, "checkpointPointer");
        checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
       ...
       ...
}

If we see carefully, protected function of different package is called here. Is this a bug in flink maven repo or it's me using it wrong way ? Is there any other way to deserialize or read flink savepoint and checkpoint ?


Solution

  • There seems to be an dependency version mismatch for your flink.

    Add the below dependencies to the pom.xml and build again, also remove the old version dependency of the flink-clients from same file.

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.9.1</version>
    </dependency>