Search code examples
google-cloud-storageapache-flinkgoogle-cloud-dataproc

Flink checkpoints to Google Cloud Storage


I am trying to configure checkpoints for flink jobs in GCS. Everything works fine if I run a test job locally (no docker and any cluster setup) but it fails with an error if I run it using docker-compose or cluster setup and deploy fat jar with jobs in flink dashboard.

Any thoughts of it? Thanks!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env configuration is like this:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

Here is my core-site.xml file:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

Dependency to gcs-connector:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

UPDATE:

After some manipulation with dependencies I've been able to write checkpoints. My current setup is:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

Also I switched the flink image to version flink:1.5.2-hadoop28

Unfortunately I am still not able to read checkpointing data as my job is always failing on restoring state with an error:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

I believe it's going to be the last error...


Solution

  • Finally I found solution here

    You must create your own image and put gcs-connector into the lib directory. Otherwise you'll always get classloading issues (user code and system classloaders).

    To create a custom Docker image we create the following Dockerfile:

    FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11
    
    RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar
    
    RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
    wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
    
    tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
    mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
    rm -r flink-1.4.2*  
    
    RUN mkdir etc-hadoop
    COPY <name of key file>.json etc-hadoop/
    COPY core-site.xml etc-hadoop/
    
    ENTRYPOINT ["/docker-entrypoint.sh"]
    EXPOSE 6123 8081
    CMD ["jobmanager"]
    

    The Docker image will be based on the Flink image we’re providing as part of the dA Platform trial. We are adding the Google Cloud Storage connector, Flink’s Hadoop package and the key with the configuration file.

    To build the custom image, the following files should be in your current directory: core-site.xml, Dockerfile and the key-file (.json).

    To finally trigger the build of the custom image, we run the following command:

    $ docker build -t flink-1.4.2-gs .
    

    Once the image has been built, we will upload the image to Google’s Container Registry. To configure Docker to properly access the registry, run this command once:

    $ gcloud auth configure-docker
    

    Next, we’ll tag and upload the container:

    $ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
    $ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs
    

    Once the upload is completed, we need to set the custom image for an Application Manager deployment. Sent the following PATCH request:

    PATCH /api/v1/deployments/<your AppMgr deployment id>
     spec:
       template:
         spec:
           flinkConfiguration:
             fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
           artifact:
             flinkImageRegistry: eu.gcr.io
             flinkImageRepository: <your project id>/flink-1.4.2-gs
             flinkImageTag: latest
    

    Alternatively, use the following curl command:

    $ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
        template: \ 
          spec: \ 
            flinkConfiguration:
              fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
            artifact: \ 
              flinkImageRegistry: eu.gcr.io \ 
              flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
              flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘
    

    With this change implemented, you’ll be able to checkpoint to Google’s Cloud Storage. Use the following pattern when specifying the directory gs:///checkpoints. For savepoints, set the state.savepoints.dir Flink configuration option.