Search code examples
javahadoopgoogle-cloud-storagegoogle-compute-engineapache-flink

How to create a RecoverableWriter in Flink for Google Cloud Storage


I wanted to use Google Cloud Storage to write (sink) elements of DataStream from my streaming job using StreamingFileSink.

For doing that, I used Google Cloud Storage connector for Hadoop as an implementation of org.apache.hadoop.fs.FileSystem, and used HadoopFileSystem as an implementation of org.apache.flink.core.fs.FileSystem that wraps the hadoop FileSystem class for Flink.

I included the following dependencies in my gradle file:

  • compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
  • compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
  • provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")

Now, from what I understand looking at the sources [1] [2] [3], Flink dynamically loads the implementations of FileSystemFactory at runtime (via java.util.ServiceLoader) and also loads the HadoopFsFactory at runtime (via reflection, if it finds Hadoop in classpath) which it then uses to create instances of FileSystem.

The issue I faced was that the default RecoverableWriter for Hadoop compatibility package only supports hdfs file scheme (I use gs) and hence throws an error at runtime.

So, I extended the HadoopFileSystem (I called GCSFileSystem) and @overrided the FileSystem#createRecoverableWriter() to return a custom implementation of RecoverableWriter which then handle the details of recovery, etc. and also created a corresponding FileSystemFactory class (the class is decorated with @AutoService and thus should be discoverable by ServiceLoader).

The setup works well locally and on a local docker cluster (actually the GCS connector throws an error due to lack of authorization, but that's fine since it means that the FileSystem is loaded and running) but it fails when I deploy it to a docker cluster running on Google Compute Engine.

On GCE, the default HadoopFileSystem gets loaded and throws the exception as the scheme is gs and not hdfs, but my assumption is that it should have loaded my implementation of the factory and thus this error shouldn't have arised.

I am on Flink v1.6.0 and running as long running session cluster on Docker using docker-flink


Solution

  • The answer is in the last line of the OP!!

    I was running on a long living Session-cluster and by the time my job.jar was executed the FileSystem initialization had already been done and the factories were already loaded! and so, no initialization calls were made when I added my Job.

    The solution? There are a few ways depending on how you deploy your job:

    • Standalone: Add the jar containing FileSystem implementation to the lib/ directory

    • Cluster (manual): Add the jar containing FileSystem implementation to the lib/ directory of your zip or image or whatever.

    • Cluster (docker)(long-living): Create a custom container image and add the jar to the lib/ directory of that image.

    • Cluster (docker)(per-job-session): Create a custom container image and add all the jars (containing FileSystem and your job, etc.) to the lib/ directory, read more about per-job session here.