Search code examples
google-cloud-storageapache-flink

StreamingFileSink + GCS


I'm currently using Flink 1.7 + gcs-connector lib. I'm trying to have the StreamingFileSink write to GCS buckets and am running into the following exception:

I came across this Jira: https://issues.apache.org/jira/browse/FLINK-11838 - but it wasn't clear to me if the the code was ever merged.

Any help on what needs to be done is much appreciated to get this to work ?

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-core-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) ~[flink-runtime_2.11-1.7.0.jar:1.7.0]
    at java.lang.Thread.run(Thread.java:834) ~[?:?]



Solution

  • It was not merged, to the best of my knowledge.

    The problem was that in order for a Flink sink component to participate in checkpoint saving and recovery it needs to be able to reset the state of whatever that sink component is writing to. In the case of sinks that write to files, that means that the sink may have to truncate the file and begin appending to it in order to return the file to the state the file was in at the time of last checkpoint. GCS, like Amazon's S3, is a binary object store and not a real file system. While you can do most things with a binary store that you can do with a file system, you can't truncate a binary object and you can't append to it. It is possible to make it appear that you're truncating and appending to binary objects, but it's a very inefficient API layer. For that reason, it's really not very efficient to try to use the StreamingFileSink on GCS (or S3).

    You're probably better off writing to an actual file system and then adding one last step to transfer written files to GCS. That means you'll probably have to write another sink so that behavior is covered by the checkpoint architecture, but it's your best bet. I think.